blob: 933579b299ff0ced929f1f4fd245a00800330de6 [file] [log] [blame]
package com.gemstone.gemfire.internal.offheap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Properties;
import org.junit.Test;
import com.gemstone.gemfire.OutOfOffHeapMemoryException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.compression.Compressor;
import com.gemstone.gemfire.compression.SnappyCompressor;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
import com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier;
import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.pdx.PdxReader;
import com.gemstone.gemfire.pdx.PdxSerializable;
import com.gemstone.gemfire.pdx.PdxWriter;
import dunit.DistributedTestCase.WaitCriterion;
/**
* Basic test of regions that use off heap storage.
* Subclasses exist for the different types of offheap store.
*
* @author darrel
*
*/
public abstract class OffHeapRegionBase {
public abstract void configureOffHeapStorage();
public abstract void unconfigureOffHeapStorage();
public abstract int perObjectOverhead();
private GemFireCacheImpl createCache() {
return createCache(false);
}
private GemFireCacheImpl createCache(boolean isPersistent) {
configureOffHeapStorage();
Properties props = new Properties();
props.setProperty("locators", "");
props.setProperty("mcast-port", "0");
props.setProperty("off-heap-memory-size", getOffHeapMemorySize());
GemFireCacheImpl result = (GemFireCacheImpl) new CacheFactory(props).setPdxPersistent(isPersistent).create();
unconfigureOffHeapStorage();
return result;
}
private void closeCache(GemFireCacheImpl gfc, boolean keepOffHeapAllocated) {
gfc.close();
if (!keepOffHeapAllocated) {
SimpleMemoryAllocatorImpl.freeOffHeapMemory();
}
// TODO cleanup default disk store files
}
protected abstract String getOffHeapMemorySize();
@Test
public void testSizeAllocation() {
// prevent cache from closing in reaction to ooom
System.setProperty(OffHeapStorage.STAY_CONNECTED_ON_OUTOFOFFHEAPMEMORY_PROPERTY, "true");
GemFireCacheImpl gfc = createCache();
try {
MemoryAllocator ma = gfc.getOffHeapStore();
assertNotNull(ma);
final long offHeapSize = ma.getFreeMemory();
assertEquals(0, ma.getUsedMemory());
MemoryChunk mc1 = ma.allocate(64, null);
assertEquals(64+perObjectOverhead(), ma.getUsedMemory());
assertEquals(offHeapSize-(64+perObjectOverhead()), ma.getFreeMemory());
mc1.release();
assertEquals(offHeapSize, ma.getFreeMemory());
assertEquals(0, ma.getUsedMemory());
// do an allocation larger than the slab size
// TODO: currently the compact will product slabs bigger than the max slab size
// (see the todo comment on compact() in SimpleMemoryAllocator).
// So we request 20m here since that it the total size.
try {
ma.allocate(1024*1024*20, null);
fail("Expected an out of heap exception");
} catch (OutOfOffHeapMemoryException expected) {
}
assertEquals(0, ma.getUsedMemory());
assertFalse(gfc.isClosed());
} finally {
System.clearProperty(OffHeapStorage.STAY_CONNECTED_ON_OUTOFOFFHEAPMEMORY_PROPERTY);
closeCache(gfc, false);
}
}
public void keep_testOutOfOffHeapMemoryErrorClosesCache() {
// this test is redundant but may be useful
final GemFireCacheImpl gfc = createCache();
try {
MemoryAllocator ma = gfc.getOffHeapStore();
assertNotNull(ma);
final long offHeapSize = ma.getFreeMemory();
assertEquals(0, ma.getUsedMemory());
MemoryChunk mc1 = ma.allocate(64, null);
assertEquals(64+perObjectOverhead(), ma.getUsedMemory());
assertEquals(offHeapSize-(64+perObjectOverhead()), ma.getFreeMemory());
mc1.release();
assertEquals(offHeapSize, ma.getFreeMemory());
assertEquals(0, ma.getUsedMemory());
// do an allocation larger than the slab size
try {
ma.allocate(1024*1024*10, null);
fail("Expected an out of heap exception");
} catch (OutOfOffHeapMemoryException expected) {
// passed
}
assertEquals(0, ma.getUsedMemory());
final WaitCriterion waitForDisconnect = new WaitCriterion() {
public boolean done() {
return gfc.isClosed();
}
public String description() {
return "Waiting for disconnect to complete";
}
};
dunit.DistributedTestCase.waitForCriterion(waitForDisconnect, 10*1000, 100, true);
assertTrue(gfc.isClosed());
} finally {
closeCache(gfc, false);
}
}
@Test
public void testByteArrayAllocation() {
GemFireCacheImpl gfc = createCache();
try {
MemoryAllocator ma = gfc.getOffHeapStore();
assertNotNull(ma);
final long offHeapSize = ma.getFreeMemory();
assertEquals(0, ma.getUsedMemory());
byte[] data = new byte[] {1,2,3,4,5,6,7,8};
MemoryChunk mc1 = (MemoryChunk)ma.allocateAndInitialize(data, false, false, null);
assertEquals(data.length+perObjectOverhead(), ma.getUsedMemory());
assertEquals(offHeapSize-(data.length+perObjectOverhead()), ma.getFreeMemory());
byte[] data2 = new byte[data.length];
mc1.readBytes(0, data2);
assertTrue(Arrays.equals(data, data2));
mc1.release();
assertEquals(offHeapSize, ma.getFreeMemory());
assertEquals(0, ma.getUsedMemory());
// try some small byte[] that don't need to be stored off heap.
data = new byte[] {1,2,3,4,5,6,7};
StoredObject so1 = ma.allocateAndInitialize(data, false, false, null);
assertEquals(0, ma.getUsedMemory());
assertEquals(offHeapSize, ma.getFreeMemory());
data2 = new byte[data.length];
data2 = (byte[])so1.getDeserializedForReading();
assertTrue(Arrays.equals(data, data2));
} finally {
closeCache(gfc, false);
}
}
private void doRegionTest(final RegionShortcut rs, final String rName) {
doRegionTest(rs, rName, false/*compressed*/);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void doRegionTest(final RegionShortcut rs, final String rName, boolean compressed) {
boolean isPersistent = rs == RegionShortcut.LOCAL_PERSISTENT || rs == RegionShortcut.REPLICATE_PERSISTENT || rs == RegionShortcut.PARTITION_PERSISTENT;
GemFireCacheImpl gfc = createCache(isPersistent);
Region r = null;
try {
gfc.setCopyOnRead(true);
final MemoryAllocator ma = gfc.getOffHeapStore();
assertNotNull(ma);
assertEquals(0, ma.getUsedMemory());
Compressor compressor = null;
if (compressed) {
compressor = SnappyCompressor.getDefaultInstance();
}
r = gfc.createRegionFactory(rs).setOffHeap(true).setCompressor(compressor).create(rName);
assertEquals(true, r.isEmpty());
assertEquals(0, ma.getUsedMemory());
Object data = new Integer(123456789);
r.put("key1", data);
//System.out.println("After put of Integer value off heap used memory=" + ma.getUsedMemory());
assertTrue(ma.getUsedMemory() == 0);
assertEquals(data, r.get("key1"));
r.invalidate("key1");
assertEquals(0, ma.getUsedMemory());
r.put("key1", data);
assertTrue(ma.getUsedMemory() == 0);
long usedBeforeUpdate = ma.getUsedMemory();
r.put("key1", data);
assertEquals(usedBeforeUpdate, ma.getUsedMemory());
assertEquals(data, r.get("key1"));
r.destroy("key1");
assertEquals(0, ma.getUsedMemory());
data = new Long(0x007FFFFFL);
r.put("key1", data);
if (!compressed) {
assertTrue(ma.getUsedMemory() == 0);
}
assertEquals(data, r.get("key1"));
data = new Long(0xFF8000000L);
r.put("key1", data);
if (!compressed) {
assertTrue(ma.getUsedMemory() == 0);
}
assertEquals(data, r.get("key1"));
// now lets set data to something that will be stored offheap
data = new Long(Long.MAX_VALUE);
r.put("key1", data);
assertEquals(data, r.get("key1"));
//System.out.println("After put of Integer value off heap used memory=" + ma.getUsedMemory());
assertTrue(ma.getUsedMemory() > 0);
data = new Long(Long.MIN_VALUE);
r.put("key1", data);
assertEquals(data, r.get("key1"));
//System.out.println("After put of Integer value off heap used memory=" + ma.getUsedMemory());
assertTrue(ma.getUsedMemory() > 0);
r.invalidate("key1");
assertEquals(0, ma.getUsedMemory());
r.put("key1", data);
assertTrue(ma.getUsedMemory() > 0);
usedBeforeUpdate = ma.getUsedMemory();
r.put("key1", data);
assertEquals(usedBeforeUpdate, ma.getUsedMemory());
assertEquals(data, r.get("key1"));
r.destroy("key1");
assertEquals(0, ma.getUsedMemory());
// confirm that byte[] do use off heap
{
byte[] originalBytes = new byte[1024];
Object oldV = r.put("byteArray", originalBytes);
long startUsedMemory = ma.getUsedMemory();
assertEquals(null, oldV);
byte[] readBytes = (byte[]) r.get("byteArray");
if (originalBytes == readBytes) {
fail("Expected different byte[] identity");
}
if (!Arrays.equals(readBytes, originalBytes)) {
fail("Expected byte array contents to be equal");
}
assertEquals(startUsedMemory, ma.getUsedMemory());
oldV = r.put("byteArray", originalBytes);
if (!compressed) {
assertEquals(null, oldV); // we default to old value being null for offheap
}
assertEquals(startUsedMemory, ma.getUsedMemory());
readBytes = (byte[])r.putIfAbsent("byteArray", originalBytes);
if (originalBytes == readBytes) {
fail("Expected different byte[] identity");
}
if (!Arrays.equals(readBytes, originalBytes)) {
fail("Expected byte array contents to be equal");
}
assertEquals(startUsedMemory, ma.getUsedMemory());
if (!r.replace("byteArray", readBytes, originalBytes)) {
fail("Expected replace to happen");
}
assertEquals(startUsedMemory, ma.getUsedMemory());
byte[] otherBytes = new byte[1024];
otherBytes[1023] = 1;
if (r.replace("byteArray", otherBytes, originalBytes)) {
fail("Expected replace to not happen");
}
if (r.replace("byteArray", "bogus string", originalBytes)) {
fail("Expected replace to not happen");
}
if (r.remove("byteArray", "bogus string")) {
fail("Expected remove to not happen");
}
assertEquals(startUsedMemory, ma.getUsedMemory());
if (!r.remove("byteArray", originalBytes)) {
fail("Expected remove to happen");
}
assertEquals(0, ma.getUsedMemory());
oldV = r.putIfAbsent("byteArray", "string value");
assertEquals(null, oldV);
assertEquals("string value", r.get("byteArray"));
if (r.replace("byteArray", "string valuE", originalBytes)) {
fail("Expected replace to not happen");
}
if (!r.replace("byteArray", "string value", originalBytes)) {
fail("Expected replace to happen");
}
oldV = r.destroy("byteArray"); // we default to old value being null for offheap
if (!compressed) {
assertEquals(null, oldV);
}
MyCacheListener listener = new MyCacheListener();
r.getAttributesMutator().addCacheListener(listener);
try {
Object valueToReplace = "string value1";
r.put("byteArray", valueToReplace);
assertEquals(null, listener.ohOldValue);
if (!compressed) {
assertEquals("string value1", listener.ohNewValue.getDeserializedForReading());
valueToReplace = listener.ohNewValue;
}
if (!r.replace("byteArray", valueToReplace, "string value2")) {
fail("expected replace to happen");
}
if (!compressed) {
assertEquals("string value2", listener.ohNewValue.getDeserializedForReading());
assertEquals("string value1", listener.ohOldValue.getDeserializedForReading());
}
// make sure that a custom equals/hashCode are not used when comparing values.
} finally {
r.getAttributesMutator().removeCacheListener(listener);
}
}
assertTrue(ma.getUsedMemory() > 0);
{
Object key = "MyValueWithPartialEquals";
MyValueWithPartialEquals v1 = new MyValueWithPartialEquals("s1");
MyValueWithPartialEquals v2 = new MyValueWithPartialEquals("s2");
MyValueWithPartialEquals v3 = new MyValueWithPartialEquals("s1");
r.put(key, v1);
try {
if (r.replace(key, v2, "should not happen")) {
fail("v1 should not be equal to v2 on an offheap region");
}
if (!r.replace(key, v3, "should happen")) {
fail("v1 should be equal to v3 on an offheap region");
}
r.put(key, v1);
if (r.remove(key, v2)) {
fail("v1 should not be equal to v2 on an offheap region");
}
if (!r.remove(key, v3)) {
fail("v1 should be equal to v3 on an offheap region");
}
} finally {
r.remove(key);
}
}
{
Object key = "MyPdxWithPartialEquals";
MyPdxWithPartialEquals v1 = new MyPdxWithPartialEquals("s", "1");
MyPdxWithPartialEquals v2 = new MyPdxWithPartialEquals("s", "2");
MyPdxWithPartialEquals v3 = new MyPdxWithPartialEquals("t", "1");
r.put(key, v1);
try {
if (r.replace(key, v3, "should not happen")) {
fail("v1 should not be equal to v3 on an offheap region");
}
if (!r.replace(key, v2, "should happen")) {
fail("v1 should be equal to v2 on an offheap region");
}
r.put(key, v1);
if (r.remove(key, v3)) {
fail("v1 should not be equal to v3 on an offheap region");
}
if (!r.remove(key, v2)) {
fail("v1 should be equal to v2 on an offheap region");
}
} finally {
r.remove(key);
}
}
byte[] value = new byte[1024];
/*while (value != null) */ {
r.put("byteArray", value);
}
r.remove("byteArray");
assertEquals(0, ma.getUsedMemory());
r.put("key1", data);
assertTrue(ma.getUsedMemory() > 0);
r.invalidateRegion();
assertEquals(0, ma.getUsedMemory());
r.put("key1", data);
assertTrue(ma.getUsedMemory() > 0);
try {
r.clear();
assertEquals(0, ma.getUsedMemory());
} catch (UnsupportedOperationException ok) {
}
r.put("key1", data);
assertTrue(ma.getUsedMemory() > 0);
if (r.getAttributes().getDataPolicy().withPersistence()) {
r.put("key2", Integer.valueOf(1234567890));
r.put("key3", new Long(0x007FFFFFL));
r.put("key4", new Long(0xFF8000000L));
assertEquals(4, r.size());
r.close();
assertEquals(0, ma.getUsedMemory());
// simple test of recovery
r = gfc.createRegionFactory(rs).setOffHeap(true).create(rName);
assertEquals(4, r.size());
assertEquals(data, r.get("key1"));
assertEquals(Integer.valueOf(1234567890), r.get("key2"));
assertEquals(new Long(0x007FFFFFL), r.get("key3"));
assertEquals(new Long(0xFF8000000L), r.get("key4"));
closeCache(gfc, true);
assertEquals(0, ma.getUsedMemory());
gfc = createCache();
if (ma != gfc.getOffHeapStore()) {
fail("identity of offHeapStore changed when cache was recreated");
}
r = gfc.createRegionFactory(rs).setOffHeap(true).create(rName);
assertTrue(ma.getUsedMemory() > 0);
assertEquals(4, r.size());
assertEquals(data, r.get("key1"));
assertEquals(Integer.valueOf(1234567890), r.get("key2"));
assertEquals(new Long(0x007FFFFFL), r.get("key3"));
assertEquals(new Long(0xFF8000000L), r.get("key4"));
}
r.destroyRegion();
assertEquals(0, ma.getUsedMemory());
} finally {
if (r != null && !r.isDestroyed()) {
r.destroyRegion();
}
closeCache(gfc, false);
}
}
/**
* This class has an equals that does not compare all its bytes.
*/
private static class MyValueWithPartialEquals implements Serializable {
private static final long serialVersionUID = 1L;
private final String value;
public MyValueWithPartialEquals(String v) {
this.value = v;
}
@Override public boolean equals(Object other) {
if (other instanceof MyValueWithPartialEquals) {
MyValueWithPartialEquals o = (MyValueWithPartialEquals) other;
// just compare the first char
return this.value.charAt(0) == o.value.charAt(0);
} else {
return false;
}
}
}
/**
* This class has an equals that does not compare all its bytes.
*/
private static class MyPdxWithPartialEquals implements PdxSerializable {
private String base;
private String value;
public MyPdxWithPartialEquals(String b, String v) {
this.base = b;
this.value = v;
}
public MyPdxWithPartialEquals() {
}
@Override
public void toData(PdxWriter writer) {
writer.writeString("base", this.base);
writer.writeString("value", this.value);
writer.markIdentityField("base");
}
@Override
public void fromData(PdxReader reader) {
this.base = reader.readString("base");
this.value = reader.readString("value");
}
}
@SuppressWarnings("rawtypes")
private static class MyCacheListener extends CacheListenerAdapter {
@Retained(OffHeapIdentifier.TEST_OFF_HEAP_REGION_BASE_LISTENER)
public StoredObject ohOldValue;
@Retained(OffHeapIdentifier.TEST_OFF_HEAP_REGION_BASE_LISTENER)
public StoredObject ohNewValue;
/**
* This method retains both ohOldValue and ohNewValue
*/
@Retained(OffHeapIdentifier.TEST_OFF_HEAP_REGION_BASE_LISTENER)
private void setEventData(EntryEvent e) {
close();
EntryEventImpl event = (EntryEventImpl) e;
this.ohOldValue = event.getOffHeapOldValue();
this.ohNewValue = event.getOffHeapNewValue();
}
@Override
public void afterCreate(EntryEvent e) {
setEventData(e);
}
@Override
public void afterDestroy(EntryEvent e) {
setEventData(e);
}
@Override
public void afterInvalidate(EntryEvent e) {
setEventData(e);
}
@Override
public void afterUpdate(EntryEvent e) {
setEventData(e);
}
@Released(OffHeapIdentifier.TEST_OFF_HEAP_REGION_BASE_LISTENER)
@Override
public void close() {
if (this.ohOldValue instanceof Chunk) {
((Chunk)this.ohOldValue).release();
}
if (this.ohNewValue instanceof Chunk) {
((Chunk)this.ohNewValue).release();
}
}
}
@Test
public void testPR() {
doRegionTest(RegionShortcut.PARTITION, "pr1");
}
@Test
public void testPRCompressed() {
doRegionTest(RegionShortcut.PARTITION, "pr2", true);
}
@Test
public void testReplicate() {
doRegionTest(RegionShortcut.REPLICATE, "rep1");
}
@Test
public void testReplicateCompressed() {
doRegionTest(RegionShortcut.REPLICATE, "rep2", true);
}
@Test
public void testLocal() {
doRegionTest(RegionShortcut.LOCAL, "local1");
}
@Test
public void testLocalCompressed() {
doRegionTest(RegionShortcut.LOCAL, "local2", true);
}
@Test
public void testLocalPersistent() {
doRegionTest(RegionShortcut.LOCAL_PERSISTENT, "localPersist1");
}
@Test
public void testLocalPersistentCompressed() {
doRegionTest(RegionShortcut.LOCAL_PERSISTENT, "localPersist2", true);
}
@Test
public void testPRPersistent() {
doRegionTest(RegionShortcut.PARTITION_PERSISTENT, "prPersist1");
}
@Test
public void testPRPersistentCompressed() {
doRegionTest(RegionShortcut.PARTITION_PERSISTENT, "prPersist2", true);
}
}