| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.offheap; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.distributed.ConfigurationProperties.OFF_HEAP_MEMORY_SIZE; |
| import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.hamcrest.CoreMatchers.equalTo; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.util.Properties; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.OutOfOffHeapMemoryException; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.internal.cache.DistributedRegion; |
| import org.apache.geode.internal.cache.OffHeapTestUtil; |
| import org.apache.geode.internal.util.StopWatch; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.Invoke; |
| import org.apache.geode.test.dunit.SerializableRunnable; |
| import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; |
| import org.apache.geode.test.junit.categories.OffHeapTest; |
| |
| /** |
| * Test behavior of region when running out of off-heap memory. |
| */ |
| @Category({OffHeapTest.class}) |
| @SuppressWarnings("serial") |
| public class OutOfOffHeapMemoryDUnitTest extends JUnit4CacheTestCase { |
| |
| protected static final AtomicReference<Cache> cache = new AtomicReference<>(); |
| protected static final AtomicReference<DistributedSystem> system = |
| new AtomicReference<>(); |
| protected static final AtomicBoolean isSmallerVM = new AtomicBoolean(); |
| |
| @Override |
| public final void preSetUp() throws Exception { |
| disconnectAllFromDS(); |
| } |
| |
| @Override |
| public final void postSetUp() throws Exception { |
| IgnoredException.addIgnoredException(OutOfOffHeapMemoryException.class.getSimpleName()); |
| } |
| |
| @Override |
| public final void preTearDownAssertions() throws Exception { |
| final SerializableRunnable checkOrphans = new SerializableRunnable() { |
| @Override |
| public void run() { |
| if (hasCache()) { |
| OffHeapTestUtil.checkOrphans(getCache()); |
| } |
| } |
| }; |
| Invoke.invokeInEveryVM(checkOrphans); |
| checkOrphans.run(); |
| } |
| |
| @SuppressWarnings("unused") // invoked by reflection from tearDown2() |
| private static void cleanup() { |
| disconnectFromDS(); |
| MemoryAllocatorImpl.freeOffHeapMemory(); |
| cache.set(null); |
| system.set(null); |
| isSmallerVM.set(false); |
| } |
| |
| protected String getOffHeapMemorySize() { |
| return "2m"; |
| } |
| |
| protected String getSmallerOffHeapMemorySize() { |
| return "1m"; |
| } |
| |
| protected RegionShortcut getRegionShortcut() { |
| return RegionShortcut.REPLICATE; |
| } |
| |
| protected String getRegionName() { |
| return "region1"; |
| } |
| |
| @Override |
| public Properties getDistributedSystemProperties() { |
| final Properties props = new Properties(); |
| props.put(MCAST_PORT, "0"); |
| props.put(STATISTIC_SAMPLING_ENABLED, "true"); |
| if (isSmallerVM.get()) { |
| props.setProperty(OFF_HEAP_MEMORY_SIZE, getSmallerOffHeapMemorySize()); |
| } else { |
| props.setProperty(OFF_HEAP_MEMORY_SIZE, getOffHeapMemorySize()); |
| } |
| return props; |
| } |
| |
| @Test |
| public void testSimpleOutOfOffHeapMemoryMemberDisconnects() { |
| final DistributedSystem system = getSystem(); |
| final Cache cache = getCache(); |
| final ClusterDistributionManager dm = |
| (ClusterDistributionManager) ((InternalDistributedSystem) system).getDistributionManager(); |
| |
| Region<Object, Object> region = |
| cache.createRegionFactory(getRegionShortcut()).setOffHeap(true).create(getRegionName()); |
| OutOfOffHeapMemoryException ooohme; |
| try { |
| Object value = new byte[1024]; |
| for (int i = 0; true; i++) { |
| region.put("key-" + i, value); |
| } |
| } catch (OutOfOffHeapMemoryException e) { |
| ooohme = e; |
| } |
| assertNotNull(ooohme); |
| |
| await() |
| .until(() -> cache.isClosed() && !system.isConnected() && dm.isClosed()); |
| |
| // wait for cache instance to be nulled out |
| await() |
| .until(() -> cache.isClosed() && !system.isConnected()); |
| |
| // verify system was closed out due to OutOfOffHeapMemoryException |
| assertFalse(system.isConnected()); |
| InternalDistributedSystem ids = (InternalDistributedSystem) system; |
| try { |
| ids.getDistributionManager(); |
| fail( |
| "InternalDistributedSystem.getDistributionManager() should throw DistributedSystemDisconnectedException"); |
| } catch (DistributedSystemDisconnectedException expected) { |
| assertRootCause(expected, OutOfOffHeapMemoryException.class); |
| } |
| |
| // verify dm was closed out due to OutOfOffHeapMemoryException |
| assertTrue(dm.isClosed()); |
| try { |
| dm.throwIfDistributionStopped(); |
| fail( |
| "DistributionManager.throwIfDistributionStopped() should throw DistributedSystemDisconnectedException"); |
| } catch (DistributedSystemDisconnectedException expected) { |
| assertRootCause(expected, OutOfOffHeapMemoryException.class); |
| } |
| |
| // verify cache was closed out due to OutOfOffHeapMemoryException |
| assertTrue(cache.isClosed()); |
| try { |
| cache.getCancelCriterion().checkCancelInProgress(null); |
| fail( |
| "GemFireCacheImpl.getCancelCriterion().checkCancelInProgress should throw DistributedSystemDisconnectedException"); |
| } catch (DistributedSystemDisconnectedException expected) { |
| assertRootCause(expected, OutOfOffHeapMemoryException.class); |
| } |
| } |
| |
| private void assertRootCause(Throwable throwable, Class<?> expected) { |
| boolean passed = false; |
| Throwable cause = throwable.getCause(); |
| while (cause != null) { |
| if (cause.getClass().equals(expected)) { |
| passed = true; |
| break; |
| } |
| cause = cause.getCause(); |
| } |
| if (!passed) { |
| throw new AssertionError("Throwable does not contain expected root cause " + expected, |
| throwable); |
| } |
| } |
| |
| @Test |
| public void testOtherMembersSeeOutOfOffHeapMemoryMemberDisconnects() { |
| final int vmCount = Host.getHost(0).getVMCount(); |
| |
| final String name = getRegionName(); |
| final RegionShortcut shortcut = getRegionShortcut(); |
| final int biggerVM = 0; |
| final int smallerVM = 1; |
| |
| Host.getHost(0).getVM(smallerVM).invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| OutOfOffHeapMemoryDUnitTest.isSmallerVM.set(true); |
| } |
| }); |
| |
| // create off-heap region in all members |
| for (int i = 0; i < vmCount; i++) { |
| Host.getHost(0).getVM(i).invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| OutOfOffHeapMemoryDUnitTest.cache.set(getCache()); |
| OutOfOffHeapMemoryDUnitTest.system.set(getSystem()); |
| |
| final Region<Object, Object> region = OutOfOffHeapMemoryDUnitTest.cache.get() |
| .createRegionFactory(shortcut).setOffHeap(true).create(name); |
| assertNotNull(region); |
| } |
| }); |
| } |
| |
| // make sure there are vmCount+1 members total |
| for (int i = 0; i < vmCount; i++) { |
| Host.getHost(0).getVM(i).invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| assertFalse(OutOfOffHeapMemoryDUnitTest.cache.get().isClosed()); |
| assertTrue(OutOfOffHeapMemoryDUnitTest.system.get().isConnected()); |
| |
| final int countMembersPlusLocator = vmCount + 1; // +1 for locator |
| final int countOtherMembers = vmCount - 1; // -1 one for self |
| |
| assertEquals(countMembersPlusLocator, |
| ((InternalDistributedSystem) OutOfOffHeapMemoryDUnitTest.system.get()) |
| .getDistributionManager().getDistributionManagerIds().size()); |
| assertEquals(countOtherMembers, |
| ((DistributedRegion) OutOfOffHeapMemoryDUnitTest.cache.get().getRegion(name)) |
| .getDistributionAdvisor().getNumProfiles()); |
| } |
| }); |
| } |
| |
| // perform puts in bigger member until smaller member goes OOOHME |
| Host.getHost(0).getVM(biggerVM).invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| final long TIME_LIMIT = 30 * 1000; |
| final StopWatch stopWatch = new StopWatch(true); |
| |
| int countOtherMembers = vmCount - 1; // -1 for self |
| final int countOtherMembersMinusSmaller = vmCount - 1 - 1; // -1 for self, -1 for smallerVM |
| |
| final Region<Object, Object> region = |
| OutOfOffHeapMemoryDUnitTest.cache.get().getRegion(name); |
| for (int i = 0; countOtherMembers > countOtherMembersMinusSmaller; i++) { |
| region.put("key-" + i, new byte[1024]); |
| countOtherMembers = |
| ((DistributedRegion) OutOfOffHeapMemoryDUnitTest.cache.get().getRegion(name)) |
| .getDistributionAdvisor().getNumProfiles(); |
| assertTrue("puts failed to push member out of off-heap memory within time limit", |
| stopWatch.elapsedTimeMillis() < TIME_LIMIT); |
| } |
| assertEquals("Member did not depart from OutOfOffHeapMemory", countOtherMembersMinusSmaller, |
| countOtherMembers); |
| } |
| }); |
| |
| // verify that member with OOOHME closed |
| Host.getHost(0).getVM(smallerVM).invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| assertTrue(OutOfOffHeapMemoryDUnitTest.cache.get().isClosed()); |
| assertFalse(OutOfOffHeapMemoryDUnitTest.system.get().isConnected()); |
| } |
| }); |
| |
| // verify that all other members noticed smaller member closed |
| for (int i = 0; i < vmCount; i++) { |
| if (i == smallerVM) { |
| continue; |
| } |
| Host.getHost(0).getVM(i).invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| final int countMembersPlusLocator = vmCount + 1 - 1; // +1 for locator, -1 for OOOHME |
| // member |
| final int countOtherMembers = vmCount - 1 - 1; // -1 for self, -1 for OOOHME member |
| |
| await() |
| .until(numDistributionManagers(), equalTo(countMembersPlusLocator)); |
| await() |
| .until(numProfiles(), equalTo(countOtherMembers)); |
| |
| } |
| |
| private Callable<Integer> numProfiles() { |
| return () -> { |
| DistributedRegion dr = |
| (DistributedRegion) OutOfOffHeapMemoryDUnitTest.cache.get().getRegion(name); |
| return dr.getDistributionAdvisor().getNumProfiles(); |
| }; |
| } |
| |
| private Callable<Integer> numDistributionManagers() { |
| return () -> { |
| InternalDistributedSystem ids = |
| (InternalDistributedSystem) OutOfOffHeapMemoryDUnitTest.system.get(); |
| return ids.getDistributionManager().getDistributionManagerIds().size(); |
| }; |
| } |
| }); |
| } |
| } |
| |
| } |