blob: a7921ee8445ebf8c6cb878597c74ff9395a6611c [file] [log] [blame]
package com.gemstone.gemfire.internal.offheap;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.OutOfOffHeapMemoryException;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.OffHeapTestUtil;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.util.StopWatch;
import dunit.Host;
import dunit.SerializableRunnable;
* Test behavior of region when running out of off-heap memory.
* @author Kirk Lund
public class OutOfOffHeapMemoryDUnitTest extends CacheTestCase {
private static final Logger logger = LogService.getLogger();
protected static final AtomicReference<Cache> cache = new AtomicReference<Cache>();
protected static final AtomicReference<DistributedSystem> system = new AtomicReference<DistributedSystem>();
protected static final AtomicBoolean isSmallerVM = new AtomicBoolean();
public OutOfOffHeapMemoryDUnitTest(String name) {
public void setUp() throws Exception {
// public static void caseSetUp() {
// //disconnectAllFromDS();
// for (int i = 0; i < Host.getHost(0).getVMCount(); i++) {
// Host.getHost(0).getVM(i).invoke(new SerializableRunnable() {
// public void run() {
// InternalDistributedSystem ids = InternalDistributedSystem.getAnyInstance();
// if (ids != null && ids.isConnected()) {
// logger.warn(OutOfOffHeapMemoryDUnitTest.class.getSimpleName() + " found DistributedSystem connection from previous test: {}", ids);
// ids.disconnect();
// }
// }
// });
// }
// }
public void tearDown2() throws Exception {
final SerializableRunnable checkOrphans = new SerializableRunnable() {
public void run() {
if(hasCache()) {
try {;
} finally {
invokeInEveryVM(getClass(), "cleanup");
@SuppressWarnings("unused") // invoked by reflection from tearDown2()
private static void cleanup() {
protected String getOffHeapMemorySize() {
return "2m";
protected String getSmallerOffHeapMemorySize() {
return "1m";
protected RegionShortcut getRegionShortcut() {
return RegionShortcut.REPLICATE;
protected String getRegionName() {
return "region1";
public Properties getDistributedSystemProperties() {
final Properties props = new Properties();
props.put(DistributionConfig.MCAST_PORT_NAME, "0");
props.put(DistributionConfig.STATISTIC_SAMPLING_ENABLED_NAME, "true");
if (isSmallerVM.get()) {
props.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, getSmallerOffHeapMemorySize());
} else {
props.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, getOffHeapMemorySize());
return props;
public void testSimpleOutOfOffHeapMemoryMemberDisconnects() {
final DistributedSystem system = getSystem();
final Cache cache = getCache();
final DistributionManager dm = (DistributionManager)((InternalDistributedSystem)system).getDistributionManager();
Region<Object, Object> region = cache.createRegionFactory(getRegionShortcut()).setOffHeap(true).create(getRegionName());
OutOfOffHeapMemoryException ooohme;
try {
Object value = new byte[1024];
for (int i = 0; true; i++) {
region.put("key-"+i, value);
} catch (OutOfOffHeapMemoryException e) {
ooohme = e;
// wait for cache to close and system to disconnect
final WaitCriterion waitForDisconnect = new WaitCriterion() {
public boolean done() {
return cache.isClosed() && !system.isConnected() && dm.isClosed();
public String description() {
return "Waiting for cache, system and dm to close";
waitForCriterion(waitForDisconnect, 10*1000, 100, true);
// wait for cache instance to be nulled out
final WaitCriterion waitForNull = new WaitCriterion() {
public boolean done() {
return GemFireCacheImpl.getInstance() == null;
public String description() {
return "Waiting for GemFireCacheImpl to null its instance";
waitForCriterion(waitForNull, 10*1000, 100, true);
// verify system was closed out due to OutOfOffHeapMemoryException
InternalDistributedSystem ids = (InternalDistributedSystem)system;
try {
fail("InternalDistributedSystem.getDistributionManager() should throw DistributedSystemDisconnectedException");
} catch (DistributedSystemDisconnectedException expected) {
assertRootCause(expected, OutOfOffHeapMemoryException.class);
// verify dm was closed out due to OutOfOffHeapMemoryException
try {
fail("DistributionManager.throwIfDistributionStopped() should throw DistributedSystemDisconnectedException");
} catch (DistributedSystemDisconnectedException expected) {
assertRootCause(expected, OutOfOffHeapMemoryException.class);
// verify cache was closed out due to OutOfOffHeapMemoryException
try {
fail("GemFireCacheImpl.getCancelCriterion().checkCancelInProgress should throw DistributedSystemDisconnectedException");
} catch (DistributedSystemDisconnectedException expected) {
assertRootCause(expected, OutOfOffHeapMemoryException.class);
// verify Cache and IDS are nulled out
private void assertRootCause(Throwable throwable, Class<?> expected) {
boolean passed = false;
Throwable cause = throwable.getCause();
while (cause != null) {
if (cause.getClass().equals(expected)) {
passed = true;
cause = cause.getCause();
if (!passed) {
throw new AssertionError("Throwable does not contain expected root cause " + expected, throwable);
public void testOtherMembersSeeOutOfOffHeapMemoryMemberDisconnects() {
final int vmCount = Host.getHost(0).getVMCount();
assertEquals(4, vmCount);
final String name = getRegionName();
final RegionShortcut shortcut = getRegionShortcut();
final int biggerVM = 0;
final int smallerVM = 1;
Host.getHost(0).getVM(smallerVM).invoke(new SerializableRunnable() {
public void run() {
// create off-heap region in all 4 members
for (int i = 0; i < vmCount; i++) {
Host.getHost(0).getVM(i).invoke(new SerializableRunnable() {
public void run() {
final Region<Object, Object> region = OutOfOffHeapMemoryDUnitTest.cache.get().createRegionFactory(shortcut).setOffHeap(true).create(name);
// make sure there are 5 members total
for (int i = 0; i < vmCount; i++) {
Host.getHost(0).getVM(i).invoke(new SerializableRunnable() {
public void run() {
final int countMembersPlusLocator = vmCount+1; // +1 for locator
final int countOtherMembers = vmCount-1; // -1 one for self
assertEquals(countMembersPlusLocator, ((InternalDistributedSystem)OutOfOffHeapMemoryDUnitTest
assertEquals(countOtherMembers, ((DistributedRegion)OutOfOffHeapMemoryDUnitTest
// perform puts in bigger member until smaller member goes OOOHME
Host.getHost(0).getVM(biggerVM).invoke(new SerializableRunnable() {
public void run() {
final long TIME_LIMIT = 30 * 1000;
final StopWatch stopWatch = new StopWatch(true);
int countOtherMembers = vmCount-1; // -1 for self
final int countOtherMembersMinusSmaller = vmCount-1-1; // -1 for self, -1 for smallerVM
final Region<Object, Object> region = OutOfOffHeapMemoryDUnitTest.cache.get().getRegion(name);
for (int i = 0; countOtherMembers > countOtherMembersMinusSmaller; i++) {
region.put("key-"+i, new byte[1024]);
countOtherMembers = ((DistributedRegion)OutOfOffHeapMemoryDUnitTest
assertTrue("puts failed to push member out of off-heap memory within time limit", stopWatch.elapsedTimeMillis() < TIME_LIMIT);
assertEquals("Member did not depart from OutOfOffHeapMemory", countOtherMembersMinusSmaller, countOtherMembers);
// verify that member with OOOHME closed
Host.getHost(0).getVM(smallerVM).invoke(new SerializableRunnable() {
public void run() {
// verify that all other members noticed smaller member closed
for (int i = 0; i < vmCount; i++) {
if (i == smallerVM) {
Host.getHost(0).getVM(i).invoke(new SerializableRunnable() {
public void run() {
final int countMembersPlusLocator = vmCount+1-1; // +1 for locator, -1 for OOOHME member
final int countOtherMembers = vmCount-1-1; // -1 for self, -1 for OOOHME member
assertEquals(countMembersPlusLocator, ((InternalDistributedSystem)OutOfOffHeapMemoryDUnitTest
assertEquals(countOtherMembers, ((DistributedRegion)OutOfOffHeapMemoryDUnitTest
// private static void foo() {
// final WaitCriterion waitForDisconnect = new WaitCriterion() {
// public boolean done() {
// return cache.isClosed() && !system.isConnected() && dm.isClosed();
// }
// public String description() {
// return "Waiting for cache, system and dm to close";
// }
// };
// waitForCriterion(waitForDisconnect, 10*1000, 100, true);
// }
// setUp() and caseSetUp() are commented out -- they were in place because of incompatible DistributedSystem bleed over from earlier DUnit tests
//public void setUp() throws Exception {
// super.setUp();
// long begin = System.currentTimeMillis();
// Cache gfc = null;
// while (gfc == null) {
// try {
// gfc = getCache();
// break;
// } catch (IllegalStateException e) {
// if (System.currentTimeMillis() > begin+60*1000) {
// fail("OutOfOffHeapMemoryDUnitTest waited too long to getCache", e);
// } else if (e.getMessage().contains("A connection to a distributed system already exists in this VM. It has the following configuration")) {
// InternalDistributedSystem ids = InternalDistributedSystem.getAnyInstance();
// if (ids != null && ids.isConnected()) {
// ids.getLogWriter().warning("OutOfOffHeapMemoryDUnitTest found DistributedSystem connection from previous test", e);
// ids.disconnect();
// }
// } else {
// throw e;
// }
// }
// }