blob: a7921ee8445ebf8c6cb878597c74ff9395a6611c [file] [log] [blame]
package com.gemstone.gemfire.internal.offheap;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.OutOfOffHeapMemoryException;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.OffHeapTestUtil;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.util.StopWatch;
import dunit.Host;
import dunit.SerializableRunnable;
/**
* Test behavior of region when running out of off-heap memory.
*
* @author Kirk Lund
*/
@SuppressWarnings("serial")
public class OutOfOffHeapMemoryDUnitTest extends CacheTestCase {
private static final Logger logger = LogService.getLogger();
protected static final AtomicReference<Cache> cache = new AtomicReference<Cache>();
protected static final AtomicReference<DistributedSystem> system = new AtomicReference<DistributedSystem>();
protected static final AtomicBoolean isSmallerVM = new AtomicBoolean();
public OutOfOffHeapMemoryDUnitTest(String name) {
super(name);
}
@Override
public void setUp() throws Exception {
disconnectAllFromDS();
super.setUp();
addExpectedException(OutOfOffHeapMemoryException.class.getSimpleName());
}
// public static void caseSetUp() {
// //disconnectAllFromDS();
// for (int i = 0; i < Host.getHost(0).getVMCount(); i++) {
// Host.getHost(0).getVM(i).invoke(new SerializableRunnable() {
// public void run() {
// InternalDistributedSystem ids = InternalDistributedSystem.getAnyInstance();
// if (ids != null && ids.isConnected()) {
// logger.warn(OutOfOffHeapMemoryDUnitTest.class.getSimpleName() + " found DistributedSystem connection from previous test: {}", ids);
// ids.disconnect();
// }
// }
// });
// }
// }
@Override
public void tearDown2() throws Exception {
final SerializableRunnable checkOrphans = new SerializableRunnable() {
@Override
public void run() {
if(hasCache()) {
OffHeapTestUtil.checkOrphans();
}
}
};
invokeInEveryVM(checkOrphans);
try {
checkOrphans.run();
} finally {
invokeInEveryVM(getClass(), "cleanup");
super.tearDown2();
}
}
@SuppressWarnings("unused") // invoked by reflection from tearDown2()
private static void cleanup() {
disconnectFromDS();
SimpleMemoryAllocatorImpl.freeOffHeapMemory();
cache.set(null);
system.set(null);
isSmallerVM.set(false);
}
protected String getOffHeapMemorySize() {
return "2m";
}
protected String getSmallerOffHeapMemorySize() {
return "1m";
}
protected RegionShortcut getRegionShortcut() {
return RegionShortcut.REPLICATE;
}
protected String getRegionName() {
return "region1";
}
@Override
public Properties getDistributedSystemProperties() {
final Properties props = new Properties();
props.put(DistributionConfig.MCAST_PORT_NAME, "0");
props.put(DistributionConfig.STATISTIC_SAMPLING_ENABLED_NAME, "true");
if (isSmallerVM.get()) {
props.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, getSmallerOffHeapMemorySize());
} else {
props.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, getOffHeapMemorySize());
}
return props;
}
public void testSimpleOutOfOffHeapMemoryMemberDisconnects() {
final DistributedSystem system = getSystem();
final Cache cache = getCache();
final DistributionManager dm = (DistributionManager)((InternalDistributedSystem)system).getDistributionManager();
Region<Object, Object> region = cache.createRegionFactory(getRegionShortcut()).setOffHeap(true).create(getRegionName());
OutOfOffHeapMemoryException ooohme;
try {
Object value = new byte[1024];
for (int i = 0; true; i++) {
region.put("key-"+i, value);
}
} catch (OutOfOffHeapMemoryException e) {
ooohme = e;
}
assertNotNull(ooohme);
// wait for cache to close and system to disconnect
final WaitCriterion waitForDisconnect = new WaitCriterion() {
public boolean done() {
return cache.isClosed() && !system.isConnected() && dm.isClosed();
}
public String description() {
return "Waiting for cache, system and dm to close";
}
};
waitForCriterion(waitForDisconnect, 10*1000, 100, true);
// wait for cache instance to be nulled out
final WaitCriterion waitForNull = new WaitCriterion() {
public boolean done() {
return GemFireCacheImpl.getInstance() == null;
}
public String description() {
return "Waiting for GemFireCacheImpl to null its instance";
}
};
waitForCriterion(waitForNull, 10*1000, 100, true);
assertNull(GemFireCacheImpl.getInstance());
// verify system was closed out due to OutOfOffHeapMemoryException
assertFalse(system.isConnected());
InternalDistributedSystem ids = (InternalDistributedSystem)system;
try {
ids.getDistributionManager();
fail("InternalDistributedSystem.getDistributionManager() should throw DistributedSystemDisconnectedException");
} catch (DistributedSystemDisconnectedException expected) {
assertRootCause(expected, OutOfOffHeapMemoryException.class);
}
// verify dm was closed out due to OutOfOffHeapMemoryException
assertTrue(dm.isClosed());
try {
dm.throwIfDistributionStopped();
fail("DistributionManager.throwIfDistributionStopped() should throw DistributedSystemDisconnectedException");
} catch (DistributedSystemDisconnectedException expected) {
assertRootCause(expected, OutOfOffHeapMemoryException.class);
}
// verify cache was closed out due to OutOfOffHeapMemoryException
assertTrue(cache.isClosed());
try {
cache.getCancelCriterion().checkCancelInProgress(null);
fail("GemFireCacheImpl.getCancelCriterion().checkCancelInProgress should throw DistributedSystemDisconnectedException");
} catch (DistributedSystemDisconnectedException expected) {
assertRootCause(expected, OutOfOffHeapMemoryException.class);
}
// verify Cache and IDS are nulled out
assertNull(GemFireCacheImpl.getInstance());
assertNull(InternalDistributedSystem.getAnyInstance());
}
private void assertRootCause(Throwable throwable, Class<?> expected) {
boolean passed = false;
Throwable cause = throwable.getCause();
while (cause != null) {
if (cause.getClass().equals(expected)) {
passed = true;
break;
}
cause = cause.getCause();
}
if (!passed) {
throw new AssertionError("Throwable does not contain expected root cause " + expected, throwable);
}
}
public void testOtherMembersSeeOutOfOffHeapMemoryMemberDisconnects() {
final int vmCount = Host.getHost(0).getVMCount();
assertEquals(4, vmCount);
final String name = getRegionName();
final RegionShortcut shortcut = getRegionShortcut();
final int biggerVM = 0;
final int smallerVM = 1;
Host.getHost(0).getVM(smallerVM).invoke(new SerializableRunnable() {
public void run() {
OutOfOffHeapMemoryDUnitTest.isSmallerVM.set(true);
}
});
// create off-heap region in all 4 members
for (int i = 0; i < vmCount; i++) {
Host.getHost(0).getVM(i).invoke(new SerializableRunnable() {
public void run() {
OutOfOffHeapMemoryDUnitTest.cache.set(getCache());
OutOfOffHeapMemoryDUnitTest.system.set(getSystem());
final Region<Object, Object> region = OutOfOffHeapMemoryDUnitTest.cache.get().createRegionFactory(shortcut).setOffHeap(true).create(name);
assertNotNull(region);
}
});
}
// make sure there are 5 members total
for (int i = 0; i < vmCount; i++) {
Host.getHost(0).getVM(i).invoke(new SerializableRunnable() {
public void run() {
assertFalse(OutOfOffHeapMemoryDUnitTest.cache.get().isClosed());
assertTrue(OutOfOffHeapMemoryDUnitTest.system.get().isConnected());
final int countMembersPlusLocator = vmCount+1; // +1 for locator
final int countOtherMembers = vmCount-1; // -1 one for self
assertEquals(countMembersPlusLocator, ((InternalDistributedSystem)OutOfOffHeapMemoryDUnitTest
.system.get()).getDistributionManager().getDistributionManagerIds().size());
assertEquals(countOtherMembers, ((DistributedRegion)OutOfOffHeapMemoryDUnitTest
.cache.get().getRegion(name)).getDistributionAdvisor().getNumProfiles());
}
});
}
// perform puts in bigger member until smaller member goes OOOHME
Host.getHost(0).getVM(biggerVM).invoke(new SerializableRunnable() {
public void run() {
final long TIME_LIMIT = 30 * 1000;
final StopWatch stopWatch = new StopWatch(true);
int countOtherMembers = vmCount-1; // -1 for self
final int countOtherMembersMinusSmaller = vmCount-1-1; // -1 for self, -1 for smallerVM
final Region<Object, Object> region = OutOfOffHeapMemoryDUnitTest.cache.get().getRegion(name);
for (int i = 0; countOtherMembers > countOtherMembersMinusSmaller; i++) {
region.put("key-"+i, new byte[1024]);
countOtherMembers = ((DistributedRegion)OutOfOffHeapMemoryDUnitTest
.cache.get().getRegion(name)).getDistributionAdvisor().getNumProfiles();
assertTrue("puts failed to push member out of off-heap memory within time limit", stopWatch.elapsedTimeMillis() < TIME_LIMIT);
}
assertEquals("Member did not depart from OutOfOffHeapMemory", countOtherMembersMinusSmaller, countOtherMembers);
}
});
// verify that member with OOOHME closed
Host.getHost(0).getVM(smallerVM).invoke(new SerializableRunnable() {
public void run() {
assertTrue(OutOfOffHeapMemoryDUnitTest.cache.get().isClosed());
assertFalse(OutOfOffHeapMemoryDUnitTest.system.get().isConnected());
}
});
// verify that all other members noticed smaller member closed
for (int i = 0; i < vmCount; i++) {
if (i == smallerVM) {
continue;
}
Host.getHost(0).getVM(i).invoke(new SerializableRunnable() {
public void run() {
final int countMembersPlusLocator = vmCount+1-1; // +1 for locator, -1 for OOOHME member
final int countOtherMembers = vmCount-1-1; // -1 for self, -1 for OOOHME member
assertEquals(countMembersPlusLocator, ((InternalDistributedSystem)OutOfOffHeapMemoryDUnitTest
.system.get()).getDistributionManager().getDistributionManagerIds().size());
assertEquals(countOtherMembers, ((DistributedRegion)OutOfOffHeapMemoryDUnitTest
.cache.get().getRegion(name)).getDistributionAdvisor().getNumProfiles());
}
});
}
}
// private static void foo() {
// final WaitCriterion waitForDisconnect = new WaitCriterion() {
// public boolean done() {
// return cache.isClosed() && !system.isConnected() && dm.isClosed();
// }
// public String description() {
// return "Waiting for cache, system and dm to close";
// }
// };
// waitForCriterion(waitForDisconnect, 10*1000, 100, true);
// }
// setUp() and caseSetUp() are commented out -- they were in place because of incompatible DistributedSystem bleed over from earlier DUnit tests
//@Override
//public void setUp() throws Exception {
// super.setUp();
// long begin = System.currentTimeMillis();
// Cache gfc = null;
// while (gfc == null) {
// try {
// gfc = getCache();
// break;
// } catch (IllegalStateException e) {
// if (System.currentTimeMillis() > begin+60*1000) {
// fail("OutOfOffHeapMemoryDUnitTest waited too long to getCache", e);
// } else if (e.getMessage().contains("A connection to a distributed system already exists in this VM. It has the following configuration")) {
// InternalDistributedSystem ids = InternalDistributedSystem.getAnyInstance();
// if (ids != null && ids.isConnected()) {
// ids.getLogWriter().warning("OutOfOffHeapMemoryDUnitTest found DistributedSystem connection from previous test", e);
// ids.disconnect();
// }
// } else {
// throw e;
// }
// }
// }
//}
}