blob: b7b1af012b597ab2c8348f99bfb81696272b918d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.offheap;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.geode.cache.Region;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.BytesAndBitsForCompactor;
import org.apache.geode.internal.cache.EntryBits;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.RegionEntryContext;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.serialization.DSCODE;
/**
* A class that stores a Java object in off-heap memory. See {@link AddressableMemoryManager} for
* how off-heap memory can be allocated, accessed, modified, and freed. Currently the object stored
* in this class is always an entry value of a Region. Note: this class has a natural ordering that
* is inconsistent with equals. Instances of this class should have a short lifetime. We do not
* store references to it in the cache. Instead the memoryAddress is stored in a primitive field in
* the cache and if used it will then, if needed, create an instance of this class.
*/
public class OffHeapStoredObject extends AbstractStoredObject
implements Comparable<OffHeapStoredObject>, MemoryBlock {
/**
* The memory address of the first byte of addressable memory that belongs to this object
*/
private final long memoryAddress;
/**
* The useCount, chunkSize, dataSizeDelta, isSerialized, and isCompressed are all stored in
* addressable memory in a HEADER. This saves heap memory by using off heap.
*/
public static final int HEADER_SIZE = 4 + 4;
/**
* We need to smallest chunk to at least have enough room for a hdr and for an off heap ref (which
* is a long).
*/
public static final int MIN_CHUNK_SIZE = HEADER_SIZE + 8;
/**
* int field. The number of bytes in this chunk.
*/
private static final int CHUNK_SIZE_OFFSET = 0;
/**
* Volatile int field The upper two bits are used for the isSerialized and isCompressed flags. The
* next three bits are unused. The lower 3 bits of the most significant byte contains a magic
* number to help us detect if we are changing the ref count of an object that has been released.
* The next byte contains the dataSizeDelta. The number of bytes of logical data in this chunk.
* Since the number of bytes of logical data is always <= chunkSize and since chunkSize never
* changes, we have dataSize be a delta whose max value would be HUGE_MULTIPLE-1. The lower two
* bytes contains the use count.
*/
static final int REF_COUNT_OFFSET = 4;
/**
* The upper two bits are used for the isSerialized and isCompressed flags.
*/
static final int IS_SERIALIZED_BIT = 0x80000000;
static final int IS_COMPRESSED_BIT = 0x40000000;
// UNUSED 0x38000000
static final int MAGIC_MASK = 0x07000000;
static final int MAGIC_NUMBER = 0x05000000;
static final int DATA_SIZE_DELTA_MASK = 0x00ff0000;
static final int DATA_SIZE_SHIFT = 16;
static final int REF_COUNT_MASK = 0x0000ffff;
static final int MAX_REF_COUNT = 0xFFFF;
static final long FILL_PATTERN = 0x3c3c3c3c3c3c3c3cL;
static final byte FILL_BYTE = 0x3c;
protected OffHeapStoredObject(long memoryAddress, int chunkSize) {
MemoryAllocatorImpl.validateAddressAndSize(memoryAddress, chunkSize);
this.memoryAddress = memoryAddress;
setSize(chunkSize);
AddressableMemoryManager.writeIntVolatile(getAddress() + REF_COUNT_OFFSET, MAGIC_NUMBER);
}
public void readyForFree() {
AddressableMemoryManager.writeIntVolatile(getAddress() + REF_COUNT_OFFSET, 0);
}
public void readyForAllocation() {
if (!AddressableMemoryManager.writeIntVolatile(getAddress() + REF_COUNT_OFFSET, 0,
MAGIC_NUMBER)) {
throw new IllegalStateException("Expected 0 but found " + Integer
.toHexString(AddressableMemoryManager.readIntVolatile(getAddress() + REF_COUNT_OFFSET)));
}
}
/**
* Should only be used by FakeChunk subclass
*/
protected OffHeapStoredObject() {
this.memoryAddress = 0L;
}
/**
* Used to create a Chunk given an existing, already allocated, memoryAddress. The off heap header
* has already been initialized.
*/
protected OffHeapStoredObject(long memoryAddress) {
MemoryAllocatorImpl.validateAddress(memoryAddress);
this.memoryAddress = memoryAddress;
}
protected OffHeapStoredObject(OffHeapStoredObject chunk) {
this.memoryAddress = chunk.memoryAddress;
}
@Override
public void fillSerializedValue(BytesAndBitsForCompactor wrapper, byte userBits) {
if (isSerialized()) {
userBits = EntryBits.setSerialized(userBits, true);
}
wrapper.setOffHeapData(this, userBits);
}
String getShortClassName() {
String cname = getClass().getName();
return cname.substring(getClass().getPackage().getName().length() + 1);
}
@Override
public boolean checkDataEquals(@Unretained StoredObject so) {
if (this == so) {
return true;
}
if (isSerialized() != so.isSerialized()) {
return false;
}
int mySize = getValueSizeInBytes();
if (mySize != so.getValueSizeInBytes()) {
return false;
}
if (!(so instanceof OffHeapStoredObject)) {
return false;
}
OffHeapStoredObject other = (OffHeapStoredObject) so;
if (getAddress() == other.getAddress()) {
return true;
}
// We want to be able to do this operation without copying any of the data into the heap.
// Hopefully the jvm is smart enough to use our stack for this short lived array.
final byte[] dataCache1 = new byte[1024];
final byte[] dataCache2 = new byte[dataCache1.length];
int i;
// inc it twice since we are reading two different objects
MemoryAllocatorImpl.getAllocator().getStats().incReads();
MemoryAllocatorImpl.getAllocator().getStats().incReads();
for (i = 0; i < mySize - (dataCache1.length - 1); i += dataCache1.length) {
this.readDataBytes(i, dataCache1);
other.readDataBytes(i, dataCache2);
for (int j = 0; j < dataCache1.length; j++) {
if (dataCache1[j] != dataCache2[j]) {
return false;
}
}
}
int bytesToRead = mySize - i;
if (bytesToRead > 0) {
// need to do one more read which will be less than dataCache.length
this.readDataBytes(i, dataCache1, 0, bytesToRead);
other.readDataBytes(i, dataCache2, 0, bytesToRead);
for (int j = 0; j < bytesToRead; j++) {
if (dataCache1[j] != dataCache2[j]) {
return false;
}
}
}
return true;
}
@Override
public boolean checkDataEquals(byte[] serializedObj) {
// caller was responsible for checking isSerialized
int mySize = getValueSizeInBytes();
if (mySize != serializedObj.length) {
return false;
}
// We want to be able to do this operation without copying any of the data into the heap.
// Hopefully the jvm is smart enough to use our stack for this short lived array.
final byte[] dataCache = new byte[1024];
int idx = 0;
int i;
MemoryAllocatorImpl.getAllocator().getStats().incReads();
for (i = 0; i < mySize - (dataCache.length - 1); i += dataCache.length) {
this.readDataBytes(i, dataCache);
for (int j = 0; j < dataCache.length; j++) {
if (dataCache[j] != serializedObj[idx++]) {
return false;
}
}
}
int bytesToRead = mySize - i;
if (bytesToRead > 0) {
// need to do one more read which will be less than dataCache.length
this.readDataBytes(i, dataCache, 0, bytesToRead);
for (int j = 0; j < bytesToRead; j++) {
if (dataCache[j] != serializedObj[idx++]) {
return false;
}
}
}
return true;
}
/**
* Throw an exception if this chunk is not allocated
*/
public void checkIsAllocated() {
int originalBits =
AddressableMemoryManager.readIntVolatile(this.memoryAddress + REF_COUNT_OFFSET);
if ((originalBits & MAGIC_MASK) != MAGIC_NUMBER) {
throw new IllegalStateException(
"It looks like this off heap memory was already freed. rawBits="
+ Integer.toHexString(originalBits));
}
}
public void incSize(int inc) {
setSize(getSize() + inc);
}
protected void beforeReturningToAllocator() {
}
@Override
public int getSize() {
return getSize(this.memoryAddress);
}
public void setSize(int size) {
setSize(this.memoryAddress, size);
}
@Override
public long getAddress() {
return this.memoryAddress;
}
@Override
public int getDataSize() {
return getDataSize(this.memoryAddress);
}
protected static int getDataSize(long memoryAdress) {
int dataSizeDelta = AddressableMemoryManager.readInt(memoryAdress + REF_COUNT_OFFSET);
dataSizeDelta &= DATA_SIZE_DELTA_MASK;
dataSizeDelta >>= DATA_SIZE_SHIFT;
return getSize(memoryAdress) - dataSizeDelta;
}
protected long getBaseDataAddress() {
return this.memoryAddress + HEADER_SIZE;
}
protected int getBaseDataOffset() {
return 0;
}
@Override
@Unretained
public ByteBuffer createDirectByteBuffer() {
return AddressableMemoryManager.createDirectByteBuffer(getBaseDataAddress(), getDataSize());
}
@Override
public void sendTo(DataOutput out) throws IOException {
if (!this.isCompressed() && out instanceof HeapDataOutputStream) {
ByteBuffer bb = createDirectByteBuffer();
if (bb != null) {
HeapDataOutputStream hdos = (HeapDataOutputStream) out;
if (this.isSerialized()) {
hdos.write(bb);
} else {
hdos.writeByte(DSCODE.BYTE_ARRAY.toByte());
InternalDataSerializer.writeArrayLength(bb.remaining(), hdos);
hdos.write(bb);
}
return;
}
}
super.sendTo(out);
}
@Override
public void sendAsByteArray(DataOutput out) throws IOException {
if (!isCompressed() && out instanceof HeapDataOutputStream) {
ByteBuffer bb = createDirectByteBuffer();
if (bb != null) {
HeapDataOutputStream hdos = (HeapDataOutputStream) out;
InternalDataSerializer.writeArrayLength(bb.remaining(), hdos);
hdos.write(bb);
return;
}
}
super.sendAsByteArray(out);
}
/**
* Returns an address that can be used with AddressableMemoryManager to access this object's data.
*
* @param offset the offset from this chunk's first byte of the byte the returned address should
* point to. Must be >= 0.
* @param size the number of bytes that will be read using the returned address. Assertion will
* use this to verify that all the memory accessed belongs to this chunk. Must be > 0.
* @return a memory address that can be used to access this object's data
*/
@Override
public long getAddressForReadingData(int offset, int size) {
assert offset >= 0 && offset + size <= getDataSize() : "Offset=" + offset + ",size=" + size
+ ",dataSize=" + getDataSize() + ", chunkSize=" + getSize()
+ ", but offset + size must be <= " + getDataSize();
assert size > 0;
long result = getBaseDataAddress() + offset;
// validateAddressAndSizeWithinSlab(result, size);
return result;
}
@Override
public byte readDataByte(int offset) {
assert offset < getDataSize();
return AddressableMemoryManager.readByte(getBaseDataAddress() + offset);
}
@Override
public void writeDataByte(int offset, byte value) {
assert offset < getDataSize();
AddressableMemoryManager.writeByte(getBaseDataAddress() + offset, value);
}
@Override
public void readDataBytes(int offset, byte[] bytes) {
readDataBytes(offset, bytes, 0, bytes.length);
}
@Override
public void writeDataBytes(int offset, byte[] bytes) {
writeDataBytes(offset, bytes, 0, bytes.length);
}
@Override
public void readDataBytes(int offset, byte[] bytes, int bytesOffset, int size) {
assert offset + size <= getDataSize();
AddressableMemoryManager.readBytes(getBaseDataAddress() + offset, bytes, bytesOffset, size);
}
@Override
public void writeDataBytes(int offset, byte[] bytes, int bytesOffset, int size) {
assert offset + size <= getDataSize();
AddressableMemoryManager.writeBytes(getBaseDataAddress() + offset, bytes, bytesOffset, size);
}
@Override
public void release() {
release(this.memoryAddress);
}
@Override
public int compareTo(OffHeapStoredObject o) {
int result = Integer.signum(getSize() - o.getSize());
if (result == 0) {
// For the same sized chunks we really don't care about their order
// but we need compareTo to only return 0 if the two chunks are identical
result = Long.signum(getAddress() - o.getAddress());
}
return result;
}
@Override
public boolean equals(Object o) {
if (o instanceof OffHeapStoredObject) {
return getAddress() == ((OffHeapStoredObject) o).getAddress();
}
return false;
}
@Override
public int hashCode() {
long value = this.getAddress();
return (int) (value ^ (value >>> 32));
}
public void setSerializedValue(byte[] value) {
writeDataBytes(0, value);
}
public byte[] getDecompressedBytes(RegionEntryContext context) {
byte[] result = getCompressedBytes();
long time = context.getCachePerfStats().startDecompression();
result = context.getCompressor().decompress(result);
context.getCachePerfStats().endDecompression(time);
return result;
}
/**
* Returns the raw possibly compressed bytes of this chunk
*/
public byte[] getCompressedBytes() {
byte[] result = new byte[getDataSize()];
readDataBytes(0, result);
// debugLog("reading", true);
MemoryAllocatorImpl.getAllocator().getStats().incReads();
return result;
}
/**
* This method should only be called on uncompressed objects
*
* @return byte array of the StoredObject value.
*/
protected byte[] getRawBytes() {
assert !isCompressed();
return getCompressedBytes();
}
@Override
public byte[] getSerializedValue() {
byte[] result = getRawBytes();
if (!isSerialized()) {
// The object is a byte[]. So we need to make it look like a serialized byte[] in our result
result = EntryEventImpl.serialize(result);
}
return result;
}
@Override
public Object getDeserializedValue(Region r, RegionEntry re) {
if (isSerialized()) {
return EntryEventImpl.deserialize(getRawBytes());
} else {
return getRawBytes();
}
}
/**
* We want this to include memory overhead so use getSize() instead of getDataSize().
*/
@Override
public int getSizeInBytes() {
// Calling getSize includes the off heap header size.
// We do not add anything to this since the size of the reference belongs to the region entry
// size
// not the size of this object.
return getSize();
}
@Override
public int getValueSizeInBytes() {
return getDataSize();
}
@Override
public boolean isSerialized() {
return (AddressableMemoryManager.readInt(this.memoryAddress + REF_COUNT_OFFSET)
& IS_SERIALIZED_BIT) != 0;
}
@Override
public boolean isCompressed() {
return (AddressableMemoryManager.readInt(this.memoryAddress + REF_COUNT_OFFSET)
& IS_COMPRESSED_BIT) != 0;
}
@Override
public boolean retain() {
return retain(this.memoryAddress);
}
@Override
public int getRefCount() {
return getRefCount(this.memoryAddress);
}
public static int getSize(long memAddr) {
MemoryAllocatorImpl.validateAddress(memAddr);
return AddressableMemoryManager.readInt(memAddr + CHUNK_SIZE_OFFSET);
}
public static void setSize(long memAddr, int size) {
MemoryAllocatorImpl.validateAddressAndSize(memAddr, size);
AddressableMemoryManager.writeInt(memAddr + CHUNK_SIZE_OFFSET, size);
}
public static long getNext(long memAddr) {
MemoryAllocatorImpl.validateAddress(memAddr);
return AddressableMemoryManager.readLong(memAddr + HEADER_SIZE);
}
public static void setNext(long memAddr, long next) {
MemoryAllocatorImpl.validateAddress(memAddr);
AddressableMemoryManager.writeLong(memAddr + HEADER_SIZE, next);
}
/**
* Fills the chunk with a repeated byte fill pattern.
*
* @param baseAddress the starting address for a {@link OffHeapStoredObject}.
*/
public static void fill(long baseAddress) {
long startAddress = baseAddress + MIN_CHUNK_SIZE;
int size = getSize(baseAddress) - MIN_CHUNK_SIZE;
AddressableMemoryManager.fill(startAddress, size, FILL_BYTE);
}
/**
* Validates that the fill pattern for this chunk has not been disturbed. This method assumes the
* TINY_MULTIPLE is 8 bytes.
*
* @throws IllegalStateException when the pattern has been violated.
*/
public void validateFill() {
assert FreeListManager.TINY_MULTIPLE == 8;
long startAddress = getAddress() + MIN_CHUNK_SIZE;
int size = getSize() - MIN_CHUNK_SIZE;
for (int i = 0; i < size; i += FreeListManager.TINY_MULTIPLE) {
if (AddressableMemoryManager.readLong(startAddress + i) != FILL_PATTERN) {
throw new IllegalStateException(
"Fill pattern violated for chunk " + getAddress() + " with size " + getSize());
}
}
}
public void setSerialized(boolean isSerialized) {
if (isSerialized) {
int bits;
int originalBits;
do {
originalBits =
AddressableMemoryManager.readIntVolatile(this.memoryAddress + REF_COUNT_OFFSET);
if ((originalBits & MAGIC_MASK) != MAGIC_NUMBER) {
throw new IllegalStateException(
"It looks like this off heap memory was already freed. rawBits="
+ Integer.toHexString(originalBits));
}
bits = originalBits | IS_SERIALIZED_BIT;
} while (!AddressableMemoryManager.writeIntVolatile(this.memoryAddress + REF_COUNT_OFFSET,
originalBits, bits));
}
}
public void setCompressed(boolean isCompressed) {
if (isCompressed) {
int bits;
int originalBits;
do {
originalBits =
AddressableMemoryManager.readIntVolatile(this.memoryAddress + REF_COUNT_OFFSET);
if ((originalBits & MAGIC_MASK) != MAGIC_NUMBER) {
throw new IllegalStateException(
"It looks like this off heap memory was already freed. rawBits="
+ Integer.toHexString(originalBits));
}
bits = originalBits | IS_COMPRESSED_BIT;
} while (!AddressableMemoryManager.writeIntVolatile(this.memoryAddress + REF_COUNT_OFFSET,
originalBits, bits));
}
}
public void setDataSize(int dataSize) {
assert dataSize <= getSize();
int delta = getSize() - dataSize;
assert delta <= (DATA_SIZE_DELTA_MASK >> DATA_SIZE_SHIFT);
delta <<= DATA_SIZE_SHIFT;
int bits;
int originalBits;
do {
originalBits =
AddressableMemoryManager.readIntVolatile(this.memoryAddress + REF_COUNT_OFFSET);
if ((originalBits & MAGIC_MASK) != MAGIC_NUMBER) {
throw new IllegalStateException(
"It looks like this off heap memory was already freed. rawBits="
+ Integer.toHexString(originalBits));
}
bits = originalBits;
bits &= ~DATA_SIZE_DELTA_MASK; // clear the old dataSizeDelta bits
bits |= delta; // set the dataSizeDelta bits to the new delta value
} while (!AddressableMemoryManager.writeIntVolatile(this.memoryAddress + REF_COUNT_OFFSET,
originalBits, bits));
}
public void initializeUseCount() {
int rawBits;
do {
rawBits = AddressableMemoryManager.readIntVolatile(this.memoryAddress + REF_COUNT_OFFSET);
if ((rawBits & MAGIC_MASK) != MAGIC_NUMBER) {
throw new IllegalStateException(
"It looks like this off heap memory was already freed. rawBits="
+ Integer.toHexString(rawBits));
}
int uc = rawBits & REF_COUNT_MASK;
if (uc != 0) {
throw new IllegalStateException("Expected use count to be zero but it was: " + uc
+ " rawBits=0x" + Integer.toHexString(rawBits));
}
} while (!AddressableMemoryManager.writeIntVolatile(this.memoryAddress + REF_COUNT_OFFSET,
rawBits, rawBits + 1));
}
public static int getRefCount(long memAddr) {
return AddressableMemoryManager.readInt(memAddr + REF_COUNT_OFFSET) & REF_COUNT_MASK;
}
public static boolean retain(long memAddr) {
MemoryAllocatorImpl.validateAddress(memAddr);
int uc;
int rawBits;
int retryCount = 0;
do {
rawBits = AddressableMemoryManager.readIntVolatile(memAddr + REF_COUNT_OFFSET);
if ((rawBits & MAGIC_MASK) != MAGIC_NUMBER) {
// same as uc == 0
// TODO MAGIC_NUMBER rethink its use and interaction with compactor fragments
return false;
}
uc = rawBits & REF_COUNT_MASK;
if (uc == MAX_REF_COUNT) {
throw new IllegalStateException(
"Maximum use count exceeded. rawBits=" + Integer.toHexString(rawBits));
} else if (uc == 0) {
return false;
}
retryCount++;
if (retryCount > 1000) {
throw new IllegalStateException("tried to write " + (rawBits + 1) + " to @"
+ Long.toHexString(memAddr) + " 1,000 times.");
}
} while (!AddressableMemoryManager.writeIntVolatile(memAddr + REF_COUNT_OFFSET, rawBits,
rawBits + 1));
// debugLog("use inced ref count " + (uc+1) + " @" + Long.toHexString(memAddr), true);
if (ReferenceCountHelper.trackReferenceCounts()) {
ReferenceCountHelper.refCountChanged(memAddr, false, uc + 1);
}
return true;
}
public static void release(final long memAddr) {
release(memAddr, null);
}
static void release(final long memAddr, FreeListManager freeListManager) {
MemoryAllocatorImpl.validateAddress(memAddr);
int newCount;
int rawBits;
boolean returnToAllocator;
do {
returnToAllocator = false;
rawBits = AddressableMemoryManager.readIntVolatile(memAddr + REF_COUNT_OFFSET);
if ((rawBits & MAGIC_MASK) != MAGIC_NUMBER) {
String msg = "It looks like off heap memory @" + Long.toHexString(memAddr)
+ " was already freed. rawBits=" + Integer.toHexString(rawBits) + " history="
+ ReferenceCountHelper.getFreeRefCountInfo(memAddr);
// debugLog(msg, true);
throw new IllegalStateException(msg);
}
int curCount = rawBits & REF_COUNT_MASK;
if ((curCount) == 0) {
// debugLog("too many frees @" + Long.toHexString(memAddr), true);
throw new IllegalStateException(
"Memory has already been freed." + " history=" + ReferenceCountHelper
.getFreeRefCountInfo(memAddr) /* + System.identityHashCode(this) */);
}
if (curCount == 1) {
newCount = 0; // clear the use count, bits, and the delta size since it will be freed.
returnToAllocator = true;
} else {
newCount = rawBits - 1;
}
} while (!AddressableMemoryManager.writeIntVolatile(memAddr + REF_COUNT_OFFSET, rawBits,
newCount));
// debugLog("free deced ref count " + (newCount&USE_COUNT_MASK) + " @" +
// Long.toHexString(memAddr), true);
if (returnToAllocator) {
if (ReferenceCountHelper.trackReferenceCounts()) {
if (ReferenceCountHelper.trackFreedReferenceCounts()) {
ReferenceCountHelper.refCountChanged(memAddr, true, newCount & REF_COUNT_MASK);
}
ReferenceCountHelper.freeRefCountInfo(memAddr);
}
if (freeListManager == null) {
freeListManager = MemoryAllocatorImpl.getAllocator().getFreeListManager();
}
freeListManager.free(memAddr);
} else {
if (ReferenceCountHelper.trackReferenceCounts()) {
ReferenceCountHelper.refCountChanged(memAddr, true, newCount & REF_COUNT_MASK);
}
}
}
@Override
public String toString() {
return super.toString() + ":<dataSize=" + getDataSize() + " refCount=" + getRefCount()
+ " addr=" + Long.toHexString(getAddress()) + ">";
}
@Override
public State getState() {
if (getRefCount() > 0) {
return State.ALLOCATED;
} else {
return State.DEALLOCATED;
}
}
@Override
public MemoryBlock getNextBlock() {
throw new UnsupportedOperationException();
}
@Override
public int getBlockSize() {
return getSize();
}
@Override
public int getSlabId() {
throw new UnsupportedOperationException();
}
@Override
public int getFreeListId() {
return -1;
}
@Override
public String getDataType() {
return null;
}
@Override
public Object getDataValue() {
return null;
}
@Override
public StoredObject slice(int position, int limit) {
return new OffHeapStoredObjectSlice(this, position, limit);
}
@Override
public boolean hasRefCount() {
return true;
}
}