| /*========================================================================= |
| * Copyright (c) 2010-2013 VMware, Inc. All rights reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. VMware products are covered by |
| * one or more patents listed at http://www.vmware.com/go/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.cache.management; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.AttributesMutator; |
| import com.gemstone.gemfire.cache.CacheException; |
| import com.gemstone.gemfire.cache.CacheLoader; |
| import com.gemstone.gemfire.cache.CacheLoaderException; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.LoaderHelper; |
| import com.gemstone.gemfire.cache.LowMemoryException; |
| import com.gemstone.gemfire.cache.PartitionAttributesFactory; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionShortcut; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.cache.client.PoolFactory; |
| import com.gemstone.gemfire.cache.client.PoolManager; |
| import com.gemstone.gemfire.cache.client.ServerOperationException; |
| import com.gemstone.gemfire.cache.control.ResourceManager; |
| import com.gemstone.gemfire.cache.management.MemoryThresholdsDUnitTest.Range; |
| import com.gemstone.gemfire.cache.server.CacheServer; |
| import com.gemstone.gemfire.cache30.BridgeTestCase; |
| import com.gemstone.gemfire.cache30.CacheSerializableRunnable; |
| import com.gemstone.gemfire.distributed.DistributedMember; |
| import com.gemstone.gemfire.distributed.internal.DistributionConfig; |
| import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.internal.AvailablePortHelper; |
| import com.gemstone.gemfire.internal.cache.DistributedRegion; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| import com.gemstone.gemfire.internal.cache.LocalRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper; |
| import com.gemstone.gemfire.internal.cache.ProxyBucketRegion; |
| import com.gemstone.gemfire.internal.cache.control.InternalResourceManager; |
| import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType; |
| import com.gemstone.gemfire.internal.cache.control.MemoryEvent; |
| import com.gemstone.gemfire.internal.cache.control.MemoryThresholds.MemoryState; |
| import com.gemstone.gemfire.internal.cache.control.OffHeapMemoryMonitor; |
| import com.gemstone.gemfire.internal.cache.control.ResourceAdvisor; |
| import com.gemstone.gemfire.internal.cache.control.ResourceListener; |
| import com.gemstone.gemfire.internal.cache.control.TestMemoryThresholdListener; |
| import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| |
| import dunit.AsyncInvocation; |
| import dunit.DistributedTestCase; |
| import dunit.Host; |
| import dunit.SerializableCallable; |
| import dunit.SerializableRunnable; |
| import dunit.VM; |
| |
| /** |
| * Tests the Off-Heap Memory thresholds of {@link ResourceManager} |
| * |
| * @author David Hoots |
| * @since 9.0 |
| */ |
| public class MemoryThresholdsOffHeapDUnitTest extends BridgeTestCase { |
| private static final long serialVersionUID = -684231183212051910L; |
| |
| final String expectedEx = LocalizedStrings.MemoryMonitor_MEMBER_ABOVE_CRITICAL_THRESHOLD.getRawText().replaceAll("\\{[0-9]+\\}", |
| ".*?"); |
| final String addExpectedExString = "<ExpectedException action=add>" + this.expectedEx + "</ExpectedException>"; |
| final String removeExpectedExString = "<ExpectedException action=remove>" + this.expectedEx + "</ExpectedException>"; |
| final String expectedBelow = LocalizedStrings.MemoryMonitor_MEMBER_BELOW_CRITICAL_THRESHOLD.getRawText().replaceAll( |
| "\\{[0-9]+\\}", ".*?"); |
| final String addExpectedBelow = "<ExpectedException action=add>" + this.expectedBelow + "</ExpectedException>"; |
| final String removeExpectedBelow = "<ExpectedException action=remove>" + this.expectedBelow + "</ExpectedException>"; |
| |
| public MemoryThresholdsOffHeapDUnitTest(String name) { |
| super(name); |
| } |
| |
| |
| |
| @Override |
| public void setUp() throws Exception { |
| addExpectedException(expectedEx); |
| addExpectedException(expectedBelow); |
| } |
| |
| |
| |
| @Override |
| public void tearDown2() throws Exception { |
| super.tearDown2(); |
| invokeInEveryVM(this.resetResourceManager); |
| } |
| |
| private SerializableCallable resetResourceManager = new SerializableCallable() { |
| public Object call() throws Exception { |
| InternalResourceManager irm = ((GemFireCacheImpl)getCache()).getResourceManager(); |
| Set<ResourceListener> listeners = irm.getResourceListeners(ResourceType.OFFHEAP_MEMORY); |
| Iterator<ResourceListener> it = listeners.iterator(); |
| while (it.hasNext()) { |
| ResourceListener<MemoryEvent> l = it.next(); |
| if (l instanceof TestMemoryThresholdListener) { |
| ((TestMemoryThresholdListener)l).resetThresholdCalls(); |
| } |
| } |
| return null; |
| } |
| }; |
| |
| /** |
| * Make sure appropriate events are delivered when moving between states. |
| * |
| * @throws Exception |
| */ |
| public void testEventDelivery() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM server2 = host.getVM(1); |
| |
| final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2); |
| final int port1 = ports[0]; |
| final int port2 = ports[1]; |
| final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); |
| final String regionName = "offHeapEventDelivery"; |
| |
| startCacheServer(server1, port1, mcastPort, 0f, 0f, |
| regionName, false/*createPR*/, false/*notifyBySubscription*/, 0); |
| startCacheServer(server2, port2, mcastPort, 70f, 90f, |
| regionName, false/*createPR*/, false/*notifyBySubscription*/, 0); |
| |
| registerTestMemoryThresholdListener(server1); |
| registerTestMemoryThresholdListener(server2); |
| |
| // NORMAL -> EVICTION |
| setUsageAboveEvictionThreshold(server2, regionName); |
| verifyListenerValue(server1, MemoryState.EVICTION, 1, true); |
| verifyListenerValue(server2, MemoryState.EVICTION, 1, false); |
| |
| // EVICTION -> CRITICAL |
| setUsageAboveCriticalThreshold(server2, regionName); |
| verifyListenerValue(server1, MemoryState.CRITICAL, 1, true); |
| verifyListenerValue(server2, MemoryState.CRITICAL, 1, false); |
| verifyListenerValue(server1, MemoryState.EVICTION, 2, true); |
| verifyListenerValue(server2, MemoryState.EVICTION, 2, false); |
| |
| // CRITICAL -> CRITICAL |
| server2.invoke(new SerializableCallable() { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public Object call() throws Exception { |
| getCache().getLogger().fine(MemoryThresholdsOffHeapDUnitTest.this.addExpectedExString); |
| getRootRegion().getSubregion(regionName).destroy("oh3"); |
| getCache().getLogger().fine(MemoryThresholdsOffHeapDUnitTest.this.removeExpectedExString); |
| return null; |
| } |
| }); |
| verifyListenerValue(server1, MemoryState.CRITICAL, 1, true); |
| verifyListenerValue(server2, MemoryState.CRITICAL, 1, false); |
| verifyListenerValue(server1, MemoryState.EVICTION, 2, true); |
| verifyListenerValue(server2, MemoryState.EVICTION, 2, false); |
| |
| // CRITICAL -> EVICTION |
| server2.invoke(new SerializableCallable() { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public Object call() throws Exception { |
| getCache().getLogger().fine(MemoryThresholdsOffHeapDUnitTest.this.addExpectedBelow); |
| getRootRegion().getSubregion(regionName).destroy("oh2"); |
| getCache().getLogger().fine(MemoryThresholdsOffHeapDUnitTest.this.removeExpectedBelow); |
| return null; |
| } |
| }); |
| verifyListenerValue(server1, MemoryState.EVICTION, 3, true); |
| verifyListenerValue(server2, MemoryState.EVICTION, 3, false); |
| |
| // EVICTION -> EVICTION |
| server2.invoke(new SerializableCallable() { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public Object call() throws Exception { |
| getRootRegion().getSubregion(regionName).put("oh6", new byte[20480]); |
| return null; |
| } |
| }); |
| verifyListenerValue(server1, MemoryState.EVICTION, 3, true); |
| verifyListenerValue(server2, MemoryState.EVICTION, 3, false); |
| |
| // EVICTION -> NORMAL |
| server2.invoke(new SerializableCallable() { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public Object call() throws Exception { |
| getRootRegion().getSubregion(regionName).destroy("oh4"); |
| return null; |
| } |
| }); |
| |
| verifyListenerValue(server1, MemoryState.CRITICAL, 1, true); |
| verifyListenerValue(server1, MemoryState.EVICTION, 3, true); |
| verifyListenerValue(server1, MemoryState.NORMAL, 1, true); |
| |
| verifyListenerValue(server2, MemoryState.CRITICAL, 1, false); |
| verifyListenerValue(server2, MemoryState.EVICTION, 3, false); |
| verifyListenerValue(server2, MemoryState.NORMAL, 1, false); |
| } |
| |
| /** |
| * test that disabling threshold does not cause remote event and |
| * remote DISABLED events are delivered |
| * @throws Exception |
| */ |
| public void testDisabledThresholds() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM server2 = host.getVM(1); |
| |
| final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2); |
| final int port1 = ports[0]; |
| final int port2 = ports[1]; |
| final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); |
| final String regionName = "offHeapDisabledThresholds"; |
| |
| startCacheServer(server1, port1, mcastPort, 0f, 0f, |
| regionName, false/*createPR*/, false/*notifyBySubscription*/, 0); |
| startCacheServer(server2, port2, mcastPort, 0f, 0f, |
| regionName, false/*createPR*/, false/*notifyBySubscription*/, 0); |
| |
| registerTestMemoryThresholdListener(server1); |
| registerTestMemoryThresholdListener(server2); |
| |
| setUsageAboveEvictionThreshold(server1, regionName); |
| verifyListenerValue(server1, MemoryState.EVICTION, 0, false); |
| verifyListenerValue(server2, MemoryState.EVICTION, 0, true); |
| |
| setThresholds(server1, 70f, 0f); |
| verifyListenerValue(server1, MemoryState.EVICTION, 1, false); |
| verifyListenerValue(server2, MemoryState.EVICTION, 1, true); |
| |
| setUsageAboveCriticalThreshold(server1, regionName); |
| verifyListenerValue(server1, MemoryState.CRITICAL, 0, false); |
| verifyListenerValue(server2, MemoryState.CRITICAL, 0, true); |
| |
| setThresholds(server1, 0f, 0f); |
| verifyListenerValue(server1, MemoryState.EVICTION_DISABLED, 1, false); |
| verifyListenerValue(server2, MemoryState.EVICTION_DISABLED, 1, true); |
| |
| setThresholds(server1, 0f, 90f); |
| verifyListenerValue(server1, MemoryState.CRITICAL, 1, false); |
| verifyListenerValue(server2, MemoryState.CRITICAL, 1, true); |
| |
| //verify that stats on server2 are not changed by events on server1 |
| server2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| InternalResourceManager irm = ((GemFireCacheImpl)getCache()).getResourceManager(); |
| assertEquals(0, irm.getStats().getOffHeapEvictionStartEvents()); |
| assertEquals(0, irm.getStats().getOffHeapCriticalEvents()); |
| assertEquals(0, irm.getStats().getOffHeapCriticalThreshold()); |
| assertEquals(0, irm.getStats().getOffHeapEvictionThreshold()); |
| return null; |
| } |
| }); |
| } |
| |
| private void setUsageAboveCriticalThreshold(final VM vm, final String regionName) { |
| vm.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| getCache().getLoggerI18n().fine(addExpectedExString); |
| Region region = getRootRegion().getSubregion(regionName); |
| if (!region.containsKey("oh1")) { |
| region.put("oh5", new byte[954204]); |
| } else { |
| region.put("oh5", new byte[122880]); |
| } |
| getCache().getLoggerI18n().fine(removeExpectedExString); |
| return null; |
| } |
| }); |
| } |
| |
| private void setUsageAboveEvictionThreshold(final VM vm, final String regionName) { |
| vm.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| getCache().getLoggerI18n().fine(addExpectedBelow); |
| Region region = getRootRegion().getSubregion(regionName); |
| region.put("oh1", new byte[245760]); |
| region.put("oh2", new byte[184320]); |
| region.put("oh3", new byte[33488]); |
| region.put("oh4", new byte[378160]); |
| getCache().getLoggerI18n().fine(removeExpectedBelow); |
| return null; |
| } |
| }); |
| } |
| |
| private void setUsageBelowEviction(final VM vm, final String regionName) { |
| vm.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| getCache().getLoggerI18n().fine(addExpectedBelow); |
| Region region = getRootRegion().getSubregion(regionName); |
| region.remove("oh1"); |
| region.remove("oh2"); |
| region.remove("oh3"); |
| region.remove("oh4"); |
| region.remove("oh5"); |
| getCache().getLoggerI18n().fine(removeExpectedBelow); |
| return null; |
| } |
| }); |
| } |
| |
| private void setThresholds(VM server, final float evictionThreshold, |
| final float criticalThreshold) { |
| |
| server.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| ResourceManager irm = getCache().getResourceManager(); |
| irm.setCriticalOffHeapPercentage(criticalThreshold); |
| irm.setEvictionOffHeapPercentage(evictionThreshold); |
| return null; |
| } |
| }); |
| } |
| |
| /** |
| * test that puts in a client are rejected when a remote VM crosses |
| * critical threshold |
| * @throws Exception |
| */ |
| public void testDistributedRegionRemoteClientPutRejection() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM server2 = host.getVM(1); |
| final VM client = host.getVM(2); |
| |
| final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2); |
| final int port1 = ports[0]; |
| final int port2 = ports[1]; |
| final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); |
| final String regionName = "offHeapDRRemoteClientPutReject"; |
| |
| startCacheServer(server1, port1, mcastPort, 0f, 0f, |
| regionName, false/*createPR*/, false/*notifyBySubscription*/, 0); |
| startCacheServer(server2, port2, mcastPort, 0f, 90f, |
| regionName, false/*createPR*/, false/*notifyBySubscription*/, 0); |
| |
| startClient(client, server1, port1, regionName); |
| |
| registerTestMemoryThresholdListener(server1); |
| registerTestMemoryThresholdListener(server2); |
| |
| doPuts(client, regionName, false/*catchRejectedException*/, |
| false/*catchLowMemoryException*/); |
| doPutAlls(client, regionName, false/*catchRejectedException*/, |
| false/*catchLowMemoryException*/, Range.DEFAULT); |
| |
| //make server2 critical |
| setUsageAboveCriticalThreshold(server2, regionName); |
| |
| verifyListenerValue(server1, MemoryState.CRITICAL, 1, true); |
| verifyListenerValue(server2, MemoryState.CRITICAL, 1, false); |
| |
| //make sure that client puts are rejected |
| doPuts(client, regionName, true/*catchRejectedException*/, |
| false/*catchLowMemoryException*/); |
| doPutAlls(client, regionName, true/*catchRejectedException*/, |
| false/*catchLowMemoryException*/, new Range(Range.DEFAULT, Range.DEFAULT.width()+1)); |
| |
| setUsageBelowEviction(server2, regionName); |
| } |
| |
| public void testDistributedRegionRemotePutRejectionLocalDestroy() throws Exception { |
| doDistributedRegionRemotePutRejection(true, false); |
| } |
| |
| public void testDistributedRegionRemotePutRejectionCacheClose() throws Exception { |
| doDistributedRegionRemotePutRejection(false, true); |
| } |
| |
| public void testDistributedRegionRemotePutRejectionBelowThreshold() throws Exception { |
| doDistributedRegionRemotePutRejection(false, false); |
| } |
| |
| public void testGettersAndSetters() { |
| getSystem(getServerProperties(0)); |
| ResourceManager rm = getCache().getResourceManager(); |
| assertEquals(0.0f, rm.getCriticalOffHeapPercentage()); |
| assertEquals(0.0f, rm.getEvictionOffHeapPercentage()); |
| |
| rm.setEvictionOffHeapPercentage(50); |
| rm.setCriticalOffHeapPercentage(90); |
| |
| // verify |
| assertEquals(50.0f, rm.getEvictionOffHeapPercentage()); |
| assertEquals(90.0f, rm.getCriticalOffHeapPercentage()); |
| |
| getCache().createRegionFactory(RegionShortcut.REPLICATE_HEAP_LRU).create(getName()); |
| |
| assertEquals(50.0f, rm.getEvictionOffHeapPercentage()); |
| assertEquals(90.0f, rm.getCriticalOffHeapPercentage()); |
| } |
| |
| /** |
| * test that puts in a server are rejected when a remote VM crosses |
| * critical threshold |
| * @throws Exception |
| */ |
| private void doDistributedRegionRemotePutRejection(boolean localDestroy, boolean cacheClose) throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM server2 = host.getVM(1); |
| |
| final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2); |
| final int port1 = ports[0]; |
| final int port2 = ports[1]; |
| final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); |
| final String regionName = "offHeapDRRemotePutRejection"; |
| |
| startCacheServer(server1, port1, mcastPort, 0f, 0f, |
| regionName, false/*createPR*/, false/*notifyBySubscription*/, 0); |
| startCacheServer(server2, port2, mcastPort, 0f, 90f, |
| regionName, false/*createPR*/, false/*notifyBySubscription*/, 0); |
| |
| registerTestMemoryThresholdListener(server1); |
| registerTestMemoryThresholdListener(server2); |
| |
| doPuts(server1, regionName, false/*catchRejectedException*/, |
| false/*catchLowMemoryException*/); |
| doPutAlls(server1, regionName, false/*catchRejectedException*/, |
| false/*catchLowMemoryException*/, Range.DEFAULT); |
| |
| //make server2 critical |
| setUsageAboveCriticalThreshold(server2, regionName); |
| |
| verifyListenerValue(server1, MemoryState.CRITICAL, 1, true); |
| verifyListenerValue(server2, MemoryState.CRITICAL, 1, false); |
| |
| //make sure that local server1 puts are rejected |
| doPuts(server1, regionName, false/*catchRejectedException*/, |
| true/*catchLowMemoryException*/); |
| Range r1 = new Range(Range.DEFAULT, Range.DEFAULT.width()+1); |
| doPutAlls(server1, regionName, false/*catchRejectedException*/, |
| true/*catchLowMemoryException*/, r1); |
| |
| if (localDestroy) { |
| //local destroy the region on sick member |
| server2.invoke(new SerializableCallable("local destroy") { |
| public Object call() throws Exception { |
| Region r = getRootRegion().getSubregion(regionName); |
| r.localDestroyRegion(); |
| return null; |
| } |
| }); |
| } else if (cacheClose) { |
| server2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| getCache().close(); |
| return null; |
| } |
| }); |
| } else { |
| setUsageBelowEviction(server2, regionName); |
| } |
| |
| //wait for remote region destroyed message to be processed |
| server1.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| WaitCriterion wc = new WaitCriterion() { |
| public String description() { |
| return "remote localRegionDestroyed message not received"; |
| } |
| public boolean done() { |
| DistributedRegion dr = (DistributedRegion)getRootRegion(). |
| getSubregion(regionName); |
| return dr.getMemoryThresholdReachedMembers().size() == 0; |
| } |
| }; |
| waitForCriterion(wc, 10000, 10, true); |
| return null; |
| } |
| }); |
| |
| //make sure puts succeed |
| doPuts(server1, regionName, false/*catchRejectedException*/, |
| false/*catchLowMemoryException*/); |
| Range r2 = new Range(r1, r1.width()+1); |
| doPutAlls(server1, regionName, false/*catchRejectedException*/, |
| false/*catchLowMemoryException*/, r2); |
| } |
| |
| /** |
| * Test that DistributedRegion cacheLoade and netLoad are passed through to the |
| * calling thread if the local VM is in a critical state. Once the VM has moved |
| * to a safe state then test that they are allowed. |
| * @throws Exception |
| */ |
| public void testDRLoadRejection() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM replicate1 = host.getVM(1); |
| final VM replicate2 = host.getVM(2); |
| final String rName = getUniqueName(); |
| final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); |
| |
| // Make sure the desired VMs will have a fresh DS. |
| AsyncInvocation d1 = replicate1.invokeAsync(DistributedTestCase.class, "disconnectFromDS"); |
| AsyncInvocation d2 = replicate2.invokeAsync(DistributedTestCase.class, "disconnectFromDS"); |
| d1.join(); |
| assertFalse(d1.exceptionOccurred()); |
| d2.join(); |
| assertFalse(d2.exceptionOccurred()); |
| CacheSerializableRunnable establishConnectivity = new CacheSerializableRunnable("establishcConnectivity") { |
| @SuppressWarnings("synthetic-access") |
| @Override |
| public void run2() throws CacheException { |
| getSystem(getServerProperties(mcastPort)); |
| } |
| }; |
| replicate1.invoke(establishConnectivity); |
| replicate2.invoke(establishConnectivity); |
| |
| CacheSerializableRunnable createRegion = new CacheSerializableRunnable("create DistributedRegion") { |
| @Override |
| public void run2() throws CacheException { |
| // Assert some level of connectivity |
| InternalDistributedSystem ds = getSystem(getServerProperties(mcastPort)); |
| assertTrue(ds.getDistributionManager().getNormalDistributionManagerIds().size() >= 1); |
| |
| InternalResourceManager irm = (InternalResourceManager)getCache().getResourceManager(); |
| irm.setCriticalOffHeapPercentage(90f); |
| AttributesFactory af = new AttributesFactory(); |
| af.setScope(Scope.DISTRIBUTED_ACK); |
| af.setDataPolicy(DataPolicy.REPLICATE); |
| af.setOffHeap(true); |
| Region region = getCache().createRegion(rName, af.create()); |
| } |
| }; |
| replicate1.invoke(createRegion); |
| replicate2.invoke(createRegion); |
| |
| replicate1.invoke(addExpectedException); |
| replicate2.invoke(addExpectedException); |
| |
| final Integer expected = (Integer)replicate1.invoke(new SerializableCallable("test Local DistributedRegion Load") { |
| public Object call() throws Exception { |
| final DistributedRegion r = (DistributedRegion) getCache().getRegion(rName); |
| AttributesMutator<Integer, String> am = r.getAttributesMutator(); |
| am.setCacheLoader(new CacheLoader<Integer, String>() { |
| final AtomicInteger numLoaderInvocations = new AtomicInteger(0); |
| public String load(LoaderHelper<Integer, String> helper) throws CacheLoaderException { |
| Integer expectedInvocations = (Integer)helper.getArgument(); |
| final int actualInvocations = this.numLoaderInvocations.getAndIncrement(); |
| if (expectedInvocations.intValue() != actualInvocations) { |
| throw new CacheLoaderException("Expected " + expectedInvocations |
| + " invocations, actual is " + actualInvocations); |
| } |
| return helper.getKey().toString(); |
| } |
| public void close() {} |
| }); |
| |
| int expectedInvocations = 0; |
| final OffHeapMemoryMonitor ohmm = ((InternalResourceManager)getCache().getResourceManager()).getOffHeapMonitor(); |
| assertFalse(ohmm.getState().isCritical()); |
| { |
| Integer k = new Integer(1); |
| assertEquals(k.toString(), r.get(k, new Integer(expectedInvocations++))); |
| } |
| |
| r.put("oh1", new byte[838860]); |
| r.put("oh3", new byte[157287]); |
| |
| WaitCriterion wc = new WaitCriterion() { |
| public String description() { |
| return "verify critical state"; |
| } |
| public boolean done() { |
| return r.memoryThresholdReached.get(); |
| } |
| }; |
| waitForCriterion(wc, 3000, 100, true); |
| { |
| Integer k = new Integer(2); |
| assertEquals(k.toString(), r.get(k, new Integer(expectedInvocations++))); |
| } |
| |
| r.destroy("oh3"); |
| wc = new WaitCriterion() { |
| public String description() { |
| return "verify critical state"; |
| } |
| public boolean done() { |
| return !r.memoryThresholdReached.get(); |
| } |
| }; |
| waitForCriterion(wc, 3000, 100, true); |
| { |
| Integer k = new Integer(3); |
| assertEquals(k.toString(), r.get(k, new Integer(expectedInvocations++))); |
| } |
| return new Integer(expectedInvocations); |
| } |
| }); |
| |
| final CacheSerializableRunnable validateData1 = new CacheSerializableRunnable("Validate data 1") { |
| @Override |
| public void run2() throws CacheException { |
| Region r = getCache().getRegion(rName); |
| Integer i1 = new Integer(1); |
| assertTrue(r.containsKey(i1)); |
| assertNotNull(r.getEntry(i1)); |
| Integer i2 = new Integer(2); |
| assertFalse(r.containsKey(i2)); |
| assertNull(r.getEntry(i2)); |
| Integer i3 = new Integer(3); |
| assertTrue(r.containsKey(i3)); |
| assertNotNull(r.getEntry(i3)); |
| } |
| }; |
| replicate1.invoke(validateData1); |
| replicate2.invoke(validateData1); |
| |
| replicate2.invoke(new SerializableCallable("test DistributedRegion netLoad") { |
| public Object call() throws Exception { |
| final DistributedRegion r = (DistributedRegion) getCache().getRegion(rName); |
| final OffHeapMemoryMonitor ohmm = ((InternalResourceManager)getCache().getResourceManager()).getOffHeapMonitor(); |
| assertFalse(ohmm.getState().isCritical()); |
| |
| int expectedInvocations = expected.intValue(); |
| { |
| Integer k = new Integer(4); |
| assertEquals(k.toString(), r.get(k, new Integer(expectedInvocations++))); |
| } |
| |
| // Place in a critical state for the next test |
| r.put("oh3", new byte[157287]); |
| WaitCriterion wc = new WaitCriterion() { |
| public String description() { |
| return "verify critical state"; |
| } |
| public boolean done() { |
| return r.memoryThresholdReached.get(); |
| } |
| }; |
| waitForCriterion(wc, 3000, 100, true); |
| { |
| Integer k = new Integer(5); |
| assertEquals(k.toString(), r.get(k, new Integer(expectedInvocations++))); |
| } |
| |
| r.destroy("oh3"); |
| wc = new WaitCriterion() { |
| public String description() { |
| return "verify critical state"; |
| } |
| public boolean done() { |
| return !r.memoryThresholdReached.get(); |
| } |
| }; |
| waitForCriterion(wc, 3000, 100, true); |
| { |
| Integer k = new Integer(6); |
| assertEquals(k.toString(), r.get(k, new Integer(expectedInvocations++))); |
| } |
| return new Integer(expectedInvocations); |
| } |
| }); |
| |
| replicate1.invoke(removeExpectedException); |
| replicate2.invoke(removeExpectedException); |
| |
| final CacheSerializableRunnable validateData2 = new CacheSerializableRunnable("Validate data 2") { |
| @Override |
| public void run2() throws CacheException { |
| Region<Integer, String> r = getCache().getRegion(rName); |
| Integer i4 = new Integer(4); |
| assertTrue(r.containsKey(i4)); |
| assertNotNull(r.getEntry(i4)); |
| Integer i5 = new Integer(5); |
| assertFalse(r.containsKey(i5)); |
| assertNull(r.getEntry(i5)); |
| Integer i6 = new Integer(6); |
| assertTrue(r.containsKey(i6)); |
| assertNotNull(r.getEntry(i6)); |
| } |
| }; |
| replicate1.invoke(validateData2); |
| replicate2.invoke(validateData2); |
| } |
| |
| |
| private SerializableRunnable addExpectedException = new SerializableRunnable |
| ("addExpectedEx") { |
| public void run() { |
| getCache().getLoggerI18n().fine(addExpectedExString); |
| getCache().getLoggerI18n().fine(addExpectedBelow); |
| }; |
| }; |
| |
| private SerializableRunnable removeExpectedException = new SerializableRunnable |
| ("removeExpectedException") { |
| public void run() { |
| getCache().getLoggerI18n().fine(removeExpectedExString); |
| getCache().getLoggerI18n().fine(removeExpectedBelow); |
| }; |
| }; |
| |
| public void testPR_RemotePutRejectionLocalDestroy() throws Exception { |
| prRemotePutRejection(false, true, false); |
| } |
| |
| public void testPR_RemotePutRejectionCacheClose() throws Exception { |
| prRemotePutRejection(true, false, false); |
| } |
| |
| public void testPR_RemotePutRejection() throws Exception { |
| prRemotePutRejection(false, false, false); |
| } |
| |
| public void testPR_RemotePutRejectionLocalDestroyWithTx() throws Exception { |
| prRemotePutRejection(false, true, true); |
| } |
| |
| public void testPR_RemotePutRejectionCacheCloseWithTx() throws Exception { |
| prRemotePutRejection(true, false, true); |
| } |
| |
| public void testPR_RemotePutRejectionWithTx() throws Exception { |
| prRemotePutRejection(false, false, true); |
| } |
| |
| private void prRemotePutRejection(boolean cacheClose, boolean localDestroy, final boolean useTx) throws Exception { |
| final Host host = Host.getHost(0); |
| final VM accessor = host.getVM(0); |
| final VM servers[] = new VM[3]; |
| servers[0] = host.getVM(1); |
| servers[1] = host.getVM(2); |
| servers[2] = host.getVM(3); |
| |
| final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3); |
| final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); |
| final String regionName = "offHeapPRRemotePutRejection"; |
| final int redundancy = 1; |
| |
| startCacheServer(servers[0], ports[0], mcastPort, 0f, 90f, |
| regionName, true/*createPR*/, false/*notifyBySubscription*/, redundancy); |
| startCacheServer(servers[1], ports[1], mcastPort, 0f, 90f, |
| regionName, true/*createPR*/, false/*notifyBySubscription*/, redundancy); |
| startCacheServer(servers[2], ports[2], mcastPort, 0f, 90f, |
| regionName, true/*createPR*/, false/*notifyBySubscription*/, redundancy); |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| getSystem(getServerProperties(mcastPort)); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(redundancy); |
| paf.setLocalMaxMemory(0); |
| paf.setTotalNumBuckets(11); |
| factory.setPartitionAttributes(paf.create()); |
| factory.setOffHeap(true); |
| createRegion(regionName, factory.create()); |
| return null; |
| } |
| }); |
| |
| doPuts(accessor, regionName, false, false); |
| final Range r1 = Range.DEFAULT; |
| doPutAlls(accessor, regionName, false, false, r1); |
| |
| servers[0].invoke(addExpectedException); |
| servers[1].invoke(addExpectedException); |
| servers[2].invoke(addExpectedException); |
| setUsageAboveCriticalThreshold(servers[0], regionName); |
| |
| final Set<InternalDistributedMember> criticalMembers = (Set) servers[0].invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| final PartitionedRegion pr = (PartitionedRegion)getRootRegion().getSubregion(regionName); |
| final int hashKey = PartitionedRegionHelper.getHashKey(pr, null, "oh5", null, null); |
| return pr.getRegionAdvisor().getBucketOwners(hashKey); |
| } |
| }); |
| |
| accessor.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| final PartitionedRegion pr = (PartitionedRegion)getRootRegion().getSubregion(regionName); |
| WaitCriterion wc = new WaitCriterion() { |
| public String description() { |
| return "remote bucket not marked sick"; |
| } |
| public boolean done() { |
| boolean keyFoundOnSickMember = false; |
| boolean caughtException = false; |
| for (int i=0; i<20; i++) { |
| Integer key = Integer.valueOf(i); |
| int hKey = PartitionedRegionHelper.getHashKey(pr, null, key, null, null); |
| Set<InternalDistributedMember> owners = pr.getRegionAdvisor().getBucketOwners(hKey); |
| final boolean hasCriticalOwners = owners.removeAll(criticalMembers); |
| if (hasCriticalOwners) { |
| keyFoundOnSickMember = true; |
| try { |
| if (useTx) getCache().getCacheTransactionManager().begin(); |
| pr.getCache().getLogger().fine("SWAP:putting in tx:"+useTx); |
| pr.put(key, "value"); |
| if (useTx) getCache().getCacheTransactionManager().commit(); |
| } catch (LowMemoryException ex) { |
| caughtException = true; |
| if (useTx) getCache().getCacheTransactionManager().rollback(); |
| } |
| } else { |
| //puts on healthy member should continue |
| pr.put(key, "value"); |
| } |
| } |
| return keyFoundOnSickMember && caughtException; |
| } |
| }; |
| waitForCriterion(wc, 10000, 10, true); |
| return null; |
| } |
| }); |
| |
| { |
| Range r2 = new Range(r1, r1.width()+1); |
| doPutAlls(accessor, regionName, false, true, r2); |
| } |
| |
| // Find all VMs that have a critical region |
| SerializableCallable getMyId = new SerializableCallable() { |
| public Object call() throws Exception { |
| return ((GemFireCacheImpl)getCache()).getMyId(); |
| } |
| }; |
| final Set<VM> criticalServers = new HashSet<VM>(); |
| for (final VM server : servers) { |
| DistributedMember member = (DistributedMember) server.invoke(getMyId); |
| if (criticalMembers.contains(member)) { |
| criticalServers.add(server); |
| } |
| } |
| |
| if (localDestroy) { |
| //local destroy the region on sick members |
| for (final VM vm : criticalServers) { |
| vm.invoke(new SerializableCallable("local destroy sick member") { |
| public Object call() throws Exception { |
| Region r = getRootRegion().getSubregion(regionName); |
| getLogWriter().info("PRLocalDestroy"); |
| r.localDestroyRegion(); |
| return null; |
| } |
| }); |
| } |
| } else if (cacheClose) { |
| // close cache on sick members |
| for (final VM vm : criticalServers) { |
| vm.invoke(new SerializableCallable("close cache sick member") { |
| public Object call() throws Exception { |
| getCache().close(); |
| return null; |
| } |
| }); |
| } |
| } else { |
| setUsageBelowEviction(servers[0], regionName); |
| servers[0].invoke(removeExpectedException); |
| servers[1].invoke(removeExpectedException); |
| servers[2].invoke(removeExpectedException); |
| } |
| |
| //do put all in a loop to allow distribution of message |
| accessor.invoke(new SerializableCallable("Put in a loop") { |
| public Object call() throws Exception { |
| final Region r = getRootRegion().getSubregion(regionName); |
| WaitCriterion wc = new WaitCriterion() { |
| public String description() { |
| return "pr should have gone un-critical"; |
| } |
| public boolean done() { |
| boolean done = true; |
| for (int i=0; i<20; i++) { |
| try { |
| r.put(i,"value"); |
| } catch (LowMemoryException e) { |
| //expected |
| done = false; |
| } |
| } |
| return done; |
| } |
| }; |
| waitForCriterion(wc, 10000, 10, true); |
| return null; |
| } |
| }); |
| doPutAlls(accessor, regionName, false, false, r1); |
| } |
| |
| /** |
| * Test that a Partitioned Region loader invocation is rejected |
| * if the VM with the bucket is in a critical state. |
| * @throws Exception |
| */ |
| public void testPRLoadRejection() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM accessor = host.getVM(1); |
| final VM ds1 = host.getVM(2); |
| final String rName = getUniqueName(); |
| final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); |
| |
| // Make sure the desired VMs will have a fresh DS. |
| AsyncInvocation d0 = accessor.invokeAsync(DistributedTestCase.class, "disconnectFromDS"); |
| AsyncInvocation d1 = ds1.invokeAsync(DistributedTestCase.class, "disconnectFromDS"); |
| d0.join(); |
| assertFalse(d0.exceptionOccurred()); |
| d1.join(); |
| assertFalse(d1.exceptionOccurred()); |
| CacheSerializableRunnable establishConnectivity = new CacheSerializableRunnable("establishcConnectivity") { |
| @Override |
| public void run2() throws CacheException { getSystem(); } |
| }; |
| ds1.invoke(establishConnectivity); |
| accessor.invoke(establishConnectivity); |
| |
| ds1.invoke(createPR(rName, false, mcastPort)); |
| accessor.invoke(createPR(rName, true, mcastPort)); |
| |
| final AtomicInteger expectedInvocations = new AtomicInteger(0); |
| |
| Integer ex = (Integer) accessor.invoke(new SerializableCallable("Invoke loader from accessor, non-critical") { |
| public Object call() throws Exception { |
| Region<Integer, String> r = getCache().getRegion(rName); |
| Integer k = new Integer(1); |
| Integer expectedInvocations0 = new Integer(expectedInvocations.getAndIncrement()); |
| assertEquals(k.toString(), r.get(k, expectedInvocations0)); // should load for new key |
| assertTrue(r.containsKey(k)); |
| Integer expectedInvocations1 = new Integer(expectedInvocations.get()); |
| assertEquals(k.toString(), r.get(k, expectedInvocations1)); // no load |
| assertEquals(k.toString(), r.get(k, expectedInvocations1)); // no load |
| return expectedInvocations1; |
| } |
| }); |
| expectedInvocations.set(ex.intValue()); |
| |
| ex = (Integer)ds1.invoke(new SerializableCallable("Invoke loader from datastore, non-critical") { |
| public Object call() throws Exception { |
| Region<Integer, String> r = getCache().getRegion(rName); |
| Integer k = new Integer(2); |
| Integer expectedInvocations1 = new Integer(expectedInvocations.getAndIncrement()); |
| assertEquals(k.toString(), r.get(k, expectedInvocations1)); // should load for new key |
| assertTrue(r.containsKey(k)); |
| Integer expectedInvocations2 = new Integer(expectedInvocations.get()); |
| assertEquals(k.toString(), r.get(k, expectedInvocations2)); // no load |
| assertEquals(k.toString(), r.get(k, expectedInvocations2)); // no load |
| String oldVal = r.remove(k); |
| assertFalse(r.containsKey(k)); |
| assertEquals(k.toString(), oldVal); |
| return expectedInvocations2; |
| } |
| }); |
| expectedInvocations.set(ex.intValue()); |
| |
| accessor.invoke(addExpectedException); |
| ds1.invoke(addExpectedException); |
| |
| ex = (Integer)ds1.invoke(new SerializableCallable("Set critical state, assert local load behavior") { |
| public Object call() throws Exception { |
| final OffHeapMemoryMonitor ohmm = ((InternalResourceManager)getCache().getResourceManager()).getOffHeapMonitor(); |
| final PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(rName); |
| final RegionAdvisor advisor = pr.getRegionAdvisor(); |
| |
| pr.put("oh1", new byte[838860]); |
| pr.put("oh3", new byte[157287]); |
| |
| WaitCriterion wc = new WaitCriterion() { |
| public String description() { |
| return "verify critical state"; |
| } |
| public boolean done() { |
| for (final ProxyBucketRegion bucket : advisor.getProxyBucketArray()) { |
| if (bucket.isBucketSick()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| }; |
| waitForCriterion(wc, 3000, 100, true); |
| |
| final Integer k = new Integer(2); // reload with same key again and again |
| final Integer expectedInvocations3 = new Integer(expectedInvocations.getAndIncrement()); |
| assertEquals(k.toString(), pr.get(k, expectedInvocations3)); // load |
| assertFalse(pr.containsKey(k)); |
| Integer expectedInvocations4 = new Integer(expectedInvocations.getAndIncrement()); |
| assertEquals(k.toString(), pr.get(k, expectedInvocations4)); // load |
| assertFalse(pr.containsKey(k)); |
| Integer expectedInvocations5 = new Integer(expectedInvocations.get()); |
| assertEquals(k.toString(), pr.get(k, expectedInvocations5)); // load |
| assertFalse(pr.containsKey(k)); |
| return expectedInvocations5; |
| } |
| }); |
| expectedInvocations.set(ex.intValue()); |
| |
| ex = (Integer)accessor.invoke(new SerializableCallable("During critical state on datastore, assert accesor load behavior") { |
| public Object call() throws Exception { |
| final Integer k = new Integer(2); // reload with same key again and again |
| Integer expectedInvocations6 = new Integer(expectedInvocations.incrementAndGet()); |
| Region<Integer, String> r = getCache().getRegion(rName); |
| assertEquals(k.toString(), r.get(k, expectedInvocations6)); // load |
| assertFalse(r.containsKey(k)); |
| Integer expectedInvocations7 = new Integer(expectedInvocations.incrementAndGet()); |
| assertEquals(k.toString(), r.get(k, expectedInvocations7)); // load |
| assertFalse(r.containsKey(k)); |
| return expectedInvocations7; |
| } |
| }); |
| expectedInvocations.set(ex.intValue()); |
| |
| ex = (Integer)ds1.invoke(new SerializableCallable("Set safe state on datastore, assert local load behavior") { |
| public Object call() throws Exception { |
| final PartitionedRegion r = (PartitionedRegion) getCache().getRegion(rName); |
| |
| r.destroy("oh3"); |
| WaitCriterion wc = new WaitCriterion() { |
| public String description() { |
| return "verify critical state"; |
| } |
| public boolean done() { |
| return !r.memoryThresholdReached.get(); |
| } |
| }; |
| waitForCriterion(wc, 3000, 100, true); |
| |
| Integer k = new Integer(3); // same key as previously used, this time is should stick |
| Integer expectedInvocations8 = new Integer(expectedInvocations.incrementAndGet()); |
| assertEquals(k.toString(), r.get(k, expectedInvocations8)); // last load for 3 |
| assertTrue(r.containsKey(k)); |
| return expectedInvocations8; |
| } |
| }); |
| expectedInvocations.set(ex.intValue()); |
| |
| accessor.invoke(new SerializableCallable("Data store in safe state, assert load behavior, accessor sets critical state, assert load behavior") { |
| public Object call() throws Exception { |
| final OffHeapMemoryMonitor ohmm = ((InternalResourceManager)getCache().getResourceManager()).getOffHeapMonitor(); |
| assertFalse(ohmm.getState().isCritical()); |
| Integer k = new Integer(4); |
| Integer expectedInvocations9 = new Integer(expectedInvocations.incrementAndGet()); |
| final PartitionedRegion r = (PartitionedRegion) getCache().getRegion(rName); |
| assertEquals(k.toString(), r.get(k, expectedInvocations9)); // load for 4 |
| assertTrue(r.containsKey(k)); |
| assertEquals(k.toString(), r.get(k, expectedInvocations9)); // no load |
| |
| // Go critical in accessor |
| r.put("oh3", new byte[157287]); |
| |
| WaitCriterion wc = new WaitCriterion() { |
| public String description() { |
| return "verify critical state"; |
| } |
| public boolean done() { |
| return r.memoryThresholdReached.get(); |
| } |
| }; |
| |
| k = new Integer(5); |
| Integer expectedInvocations10 = new Integer(expectedInvocations.incrementAndGet()); |
| assertEquals(k.toString(), r.get(k, expectedInvocations10)); // load for key 5 |
| assertTrue(r.containsKey(k)); |
| assertEquals(k.toString(), r.get(k, expectedInvocations10)); // no load |
| |
| // Clean up critical state |
| r.destroy("oh3"); |
| wc = new WaitCriterion() { |
| public String description() { |
| return "verify critical state"; |
| } |
| public boolean done() { |
| return !ohmm.getState().isCritical(); |
| } |
| }; |
| return expectedInvocations10; |
| } |
| }); |
| |
| accessor.invoke(removeExpectedException); |
| ds1.invoke(removeExpectedException); |
| } |
| |
| private CacheSerializableRunnable createPR(final String rName, final boolean accessor, final int mcastPort) { |
| return new CacheSerializableRunnable("create PR accessor") { |
| @Override |
| public void run2() throws CacheException { |
| // Assert some level of connectivity |
| getSystem(getServerProperties(mcastPort)); |
| InternalResourceManager irm = (InternalResourceManager)getCache().getResourceManager(); |
| irm.setCriticalOffHeapPercentage(90f); |
| AttributesFactory<Integer, String> af = new AttributesFactory<Integer, String>(); |
| if (!accessor) { |
| af.setCacheLoader(new CacheLoader<Integer, String>() { |
| final AtomicInteger numLoaderInvocations = new AtomicInteger(0); |
| public String load(LoaderHelper<Integer, String> helper) throws CacheLoaderException { |
| Integer expectedInvocations = (Integer)helper.getArgument(); |
| final int actualInvocations = this.numLoaderInvocations.getAndIncrement(); |
| if (expectedInvocations.intValue() != actualInvocations) { |
| throw new CacheLoaderException("Expected " + expectedInvocations |
| + " invocations, actual is " + actualInvocations); |
| } |
| return helper.getKey().toString(); |
| } |
| public void close() {} |
| }); |
| |
| af.setPartitionAttributes(new PartitionAttributesFactory().create()); |
| } else { |
| af.setPartitionAttributes(new PartitionAttributesFactory().setLocalMaxMemory(0).create()); |
| } |
| af.setOffHeap(true); |
| getCache().createRegion(rName, af.create()); |
| } |
| }; |
| } |
| |
| /** |
| * Test that LocalRegion cache Loads are not stored in the Region |
| * if the VM is in a critical state, then test that they are allowed |
| * once the VM is no longer critical |
| * @throws Exception |
| */ |
| public void testLRLoadRejection() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM vm = host.getVM(2); |
| final String rName = getUniqueName(); |
| final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); |
| |
| vm.invoke(DistributedTestCase.class, "disconnectFromDS"); |
| |
| vm.invoke(new CacheSerializableRunnable("test LocalRegion load passthrough when critical") { |
| @Override |
| public void run2() throws CacheException { |
| getSystem(getServerProperties(mcastPort)); |
| InternalResourceManager irm = (InternalResourceManager)getCache().getResourceManager(); |
| final OffHeapMemoryMonitor ohmm = irm.getOffHeapMonitor(); |
| irm.setCriticalOffHeapPercentage(90f); |
| AttributesFactory<Integer, String> af = new AttributesFactory<Integer, String>(); |
| af.setScope(Scope.LOCAL); |
| af.setOffHeap(true); |
| final AtomicInteger numLoaderInvocations = new AtomicInteger(0); |
| af.setCacheLoader(new CacheLoader<Integer, String>() { |
| public String load(LoaderHelper<Integer, String> helper) |
| throws CacheLoaderException { |
| numLoaderInvocations.incrementAndGet(); |
| return helper.getKey().toString(); |
| } |
| public void close() {} |
| }); |
| final LocalRegion r = (LocalRegion) getCache().createRegion(rName, af.create()); |
| |
| assertFalse(ohmm.getState().isCritical()); |
| int expectedInvocations = 0; |
| assertEquals(expectedInvocations++, numLoaderInvocations.get()); |
| { |
| Integer k = new Integer(1); |
| assertEquals(k.toString(), r.get(k)); |
| } |
| assertEquals(expectedInvocations++, numLoaderInvocations.get()); |
| expectedInvocations++; expectedInvocations++; |
| r.getAll(createRanges(10, 12)); |
| assertEquals(expectedInvocations++, numLoaderInvocations.get()); |
| |
| getCache().getLoggerI18n().fine(addExpectedExString); |
| r.put("oh1", new byte[838860]); |
| r.put("oh3", new byte[157287]); |
| getCache().getLoggerI18n().fine(removeExpectedExString); |
| WaitCriterion wc = new WaitCriterion() { |
| public String description() { |
| return "verify critical state"; |
| } |
| public boolean done() { |
| return r.memoryThresholdReached.get(); |
| } |
| }; |
| waitForCriterion(wc, 3000, 100, true); |
| { |
| Integer k = new Integer(2); |
| assertEquals(k.toString(), r.get(k)); |
| } |
| assertEquals(expectedInvocations++, numLoaderInvocations.get()); |
| expectedInvocations++; expectedInvocations++; |
| r.getAll(createRanges(13, 15)); |
| assertEquals(expectedInvocations++, numLoaderInvocations.get()); |
| |
| getCache().getLoggerI18n().fine(addExpectedBelow); |
| r.destroy("oh3"); |
| getCache().getLoggerI18n().fine(removeExpectedBelow); |
| wc = new WaitCriterion() { |
| public String description() { |
| return "verify critical state"; |
| } |
| public boolean done() { |
| return !r.memoryThresholdReached.get(); |
| } |
| }; |
| waitForCriterion(wc, 3000, 100, true); |
| |
| { |
| Integer k = new Integer(3); |
| assertEquals(k.toString(), r.get(k)); |
| } |
| assertEquals(expectedInvocations++, numLoaderInvocations.get()); |
| expectedInvocations++; expectedInvocations++; |
| r.getAll(createRanges(16, 18)); |
| assertEquals(expectedInvocations, numLoaderInvocations.get()); |
| |
| // Do extra validation that the entry doesn't exist in the local region |
| for (Integer i: createRanges(2, 2, 13, 15)) { |
| if (r.containsKey(i)) { |
| fail("Expected containsKey return false for key" + i); |
| } |
| if (r.getEntry(i) != null) { |
| fail("Expected getEntry to return null for key" + i); |
| } |
| } |
| } |
| }); |
| } |
| |
| /** Create a list of integers consisting of the ranges defined by the provided |
| * argument e.g.. createRanges(1, 4, 10, 12) means create ranges 1 through 4 and |
| * 10 through 12 and should yield the list: |
| * 1, 2, 3, 4, 10, 11, 12 |
| */ |
| public static List<Integer> createRanges(int... startEnds) { |
| assert startEnds.length % 2 == 0; |
| ArrayList<Integer> ret = new ArrayList<Integer>(); |
| for (int si=0; si<startEnds.length; si++) { |
| final int start = startEnds[si++]; |
| final int end = startEnds[si]; |
| assert end >= start; |
| ret.ensureCapacity(ret.size() + ((end-start)+1)); |
| for (int i=start; i<=end; i++) { |
| ret.add(new Integer(i)); |
| } |
| } |
| return ret; |
| } |
| |
| public void testCleanAdvisorClose() throws Exception { |
| final Host host = Host.getHost(0); |
| final VM server1 = host.getVM(0); |
| final VM server2 = host.getVM(1); |
| final VM server3 = host.getVM(2); |
| |
| final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3); |
| final int port1 = ports[0]; |
| final int port2 = ports[1]; |
| final int port3 = ports[2]; |
| final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); |
| final String regionName = "testEventOrger"; |
| |
| startCacheServer(server1, port1, mcastPort, 0f, 0f, |
| regionName, false/*createPR*/, false/*notifyBySubscription*/, 0); |
| startCacheServer(server2, port2, mcastPort, 0f, 0f, |
| regionName, false/*createPR*/, false/*notifyBySubscription*/, 0); |
| |
| verifyProfiles(server1, 1); |
| verifyProfiles(server2, 1); |
| |
| server2.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| closeCache(); |
| return null; |
| } |
| }); |
| |
| verifyProfiles(server1, 0); |
| |
| startCacheServer(server3, port3, mcastPort, 0f, 0f, |
| regionName, false/*createPR*/, false/*notifyBySubscription*/, 0); |
| |
| verifyProfiles(server1, 1); |
| verifyProfiles(server3, 1); |
| } |
| |
| public void testPRClientPutRejection() throws Exception { |
| doClientServerTest("parRegReject", true/*createPR*/); |
| } |
| |
| public void testDistributedRegionClientPutRejection() throws Exception { |
| doClientServerTest("distrReject", false/*createPR*/); |
| } |
| |
| private void doPuts(VM vm, final String regionName, |
| final boolean catchServerException, final boolean catchLowMemoryException) { |
| |
| vm.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region r = getRootRegion().getSubregion(regionName); |
| try { |
| r.put(Integer.valueOf(0), "value-1"); |
| if (catchServerException || catchLowMemoryException) { |
| fail("An expected ResourceException was not thrown"); |
| } |
| } catch (ServerOperationException ex) { |
| if (!catchServerException) { |
| fail("Unexpected exception: ", ex); |
| } |
| if (!(ex.getCause() instanceof LowMemoryException)) { |
| fail("Unexpected exception: ", ex); |
| } |
| } catch (LowMemoryException low) { |
| if (!catchLowMemoryException) { |
| fail("Unexpected exception: ", low); |
| } |
| } |
| return null; |
| } |
| }); |
| } |
| |
| private void doPutAlls(VM vm, final String regionName, |
| final boolean catchServerException, final boolean catchLowMemoryException, |
| final Range rng) { |
| |
| vm.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| Region r = getRootRegion().getSubregion(regionName); |
| Map<Integer, String> temp = new HashMap<Integer, String>(); |
| for (int i=rng.start; i<rng.end; i++) { |
| Integer k = Integer.valueOf(i); |
| temp.put(k, "value-"+i); |
| } |
| try { |
| r.putAll(temp); |
| if (catchServerException || catchLowMemoryException) { |
| fail("An expected ResourceException was not thrown"); |
| } |
| for (Map.Entry<Integer, String> me: temp.entrySet()) { |
| assertEquals(me.getValue(), r.get(me.getKey())); |
| } |
| } catch (ServerOperationException ex) { |
| if (!catchServerException) { |
| fail("Unexpected exception: ", ex); |
| } |
| if (!(ex.getCause() instanceof LowMemoryException)) { |
| fail("Unexpected exception: ", ex); |
| } |
| for(Integer me: temp.keySet()) { |
| assertFalse("Key " + me + " should not exist", r.containsKey(me)); |
| } |
| } catch (LowMemoryException low) { |
| getLogWriter().info("Caught LowMemoryException", low); |
| if (!catchLowMemoryException) { |
| fail("Unexpected exception: ", low); |
| } |
| for(Integer me: temp.keySet()) { |
| assertFalse("Key " + me + " should not exist", r.containsKey(me)); |
| } |
| } |
| return null; |
| } |
| }); |
| } |
| |
| private void doClientServerTest(final String regionName, boolean createPR) |
| throws Exception { |
| //create region on the server |
| final Host host = Host.getHost(0); |
| final VM server = host.getVM(0); |
| final VM client = host.getVM(1); |
| |
| final int port = AvailablePortHelper.getRandomAvailableTCPPort(); |
| final int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); |
| startCacheServer(server, port, mcastPort, 0f, 90f, |
| regionName, createPR, false, 0); |
| startClient(client, server, port, regionName); |
| doPuts(client, regionName, false/*catchServerException*/, |
| false/*catchLowMemoryException*/); |
| doPutAlls(client, regionName, false/*catchServerException*/, |
| false/*catchLowMemoryException*/, Range.DEFAULT); |
| |
| //make the region sick in the server |
| server.invoke(new SerializableRunnable() { |
| public void run() { |
| InternalResourceManager irm = ((GemFireCacheImpl)getCache()).getResourceManager(); |
| final OffHeapMemoryMonitor ohm = irm.getOffHeapMonitor(); |
| assertTrue(ohm.getState().isNormal()); |
| getCache().getLoggerI18n().fine(addExpectedExString); |
| getRootRegion().getSubregion(regionName).put(1, new byte[943720]); |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public String description() { |
| return "Expected to go critical"; |
| } |
| |
| @Override |
| public boolean done() { |
| return ohm.getState().isCritical(); |
| } |
| }; |
| waitForCriterion(wc, 5000, 100, true); |
| getCache().getLoggerI18n().fine(removeExpectedExString); |
| return; |
| } |
| }); |
| |
| //make sure client puts are rejected |
| doPuts(client, regionName, true/*catchServerException*/, |
| false/*catchLowMemoryException*/); |
| doPutAlls(client, regionName, true/*catchServerException*/, |
| false/*catchLowMemoryException*/, new Range(Range.DEFAULT, Range.DEFAULT.width()+1)); |
| |
| //make the region healthy in the server |
| server.invoke(new SerializableRunnable() { |
| public void run() { |
| InternalResourceManager irm = ((GemFireCacheImpl)getCache()).getResourceManager(); |
| final OffHeapMemoryMonitor ohm = irm.getOffHeapMonitor(); |
| assertTrue(ohm.getState().isCritical()); |
| getCache().getLogger().fine(MemoryThresholdsOffHeapDUnitTest.this.addExpectedBelow); |
| getRootRegion().getSubregion(regionName).destroy(1); |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public String description() { |
| return "Expected to go normal"; |
| } |
| |
| @Override |
| public boolean done() { |
| return ohm.getState().isNormal(); |
| } |
| }; |
| waitForCriterion(wc, 5000, 100, true); |
| getCache().getLogger().fine(MemoryThresholdsOffHeapDUnitTest.this.removeExpectedBelow); |
| return; |
| } |
| }); |
| } |
| |
| private void registerTestMemoryThresholdListener(VM vm) { |
| vm.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| TestMemoryThresholdListener listener = new TestMemoryThresholdListener(); |
| InternalResourceManager irm = ((GemFireCacheImpl)getCache()).getResourceManager(); |
| irm.addResourceListener(ResourceType.OFFHEAP_MEMORY, listener); |
| assertTrue(irm.getResourceListeners(ResourceType.OFFHEAP_MEMORY).contains(listener)); |
| return null; |
| } |
| }); |
| } |
| |
| private void startCacheServer(VM server, final int port, final int mcastPort, |
| final float evictionThreshold, final float criticalThreshold, final String regionName, |
| final boolean createPR, final boolean notifyBySubscription, final int prRedundancy) throws Exception { |
| |
| server.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| getSystem(getServerProperties(mcastPort)); |
| GemFireCacheImpl cache = (GemFireCacheImpl)getCache(); |
| |
| InternalResourceManager irm = cache.getResourceManager(); |
| irm.setEvictionOffHeapPercentage(evictionThreshold); |
| irm.setCriticalOffHeapPercentage(criticalThreshold); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| if (createPR) { |
| PartitionAttributesFactory paf = new PartitionAttributesFactory(); |
| paf.setRedundantCopies(prRedundancy); |
| paf.setTotalNumBuckets(11); |
| factory.setPartitionAttributes(paf.create()); |
| factory.setOffHeap(true); |
| } else { |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| factory.setOffHeap(true); |
| } |
| Region region = createRegion(regionName, factory.create()); |
| if (createPR) { |
| assertTrue(region instanceof PartitionedRegion); |
| } else { |
| assertTrue(region instanceof DistributedRegion); |
| } |
| CacheServer cacheServer = getCache().addCacheServer(); |
| cacheServer.setPort(port); |
| cacheServer.setNotifyBySubscription(notifyBySubscription); |
| cacheServer.start(); |
| return null; |
| } |
| }); |
| } |
| |
| private void startClient(VM client, final VM server, final int serverPort, |
| final String regionName) { |
| |
| client.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| getSystem(getClientProps()); |
| getCache(); |
| |
| PoolFactory pf = PoolManager.createFactory(); |
| pf.addServer(getServerHostName(server.getHost()), serverPort); |
| pf.create("pool1"); |
| |
| AttributesFactory af = new AttributesFactory(); |
| af.setScope(Scope.LOCAL); |
| af.setPoolName("pool1"); |
| createRegion(regionName, af.create()); |
| return null; |
| } |
| }); |
| } |
| |
| /** |
| * Verifies that the test listener value on the given vm is what is expected |
| * Note that for remote events useWaitCriterion must be true |
| * |
| * @param vm |
| * the vm where verification should take place |
| * @param type |
| * the type of event to validate, use {@link MemoryEventType#UNKNOWN} |
| * to verify all events |
| * @param value |
| * the expected value |
| * @param useWaitCriterion |
| * must be true for remote events |
| */ |
| private void verifyListenerValue(VM vm, final MemoryState state, final int value, final boolean useWaitCriterion) { |
| vm.invoke(new SerializableCallable() { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public Object call() throws Exception { |
| WaitCriterion wc = null; |
| Set<ResourceListener> listeners = getGemfireCache().getResourceManager().getResourceListeners(ResourceType.OFFHEAP_MEMORY); |
| TestMemoryThresholdListener tmp_listener = null; |
| Iterator<ResourceListener> it = listeners.iterator(); |
| while (it.hasNext()) { |
| ResourceListener<MemoryEvent> l = it.next(); |
| if (l instanceof TestMemoryThresholdListener) { |
| tmp_listener = (TestMemoryThresholdListener) l; |
| break; |
| } |
| } |
| final TestMemoryThresholdListener listener = tmp_listener == null ? null : tmp_listener; |
| switch (state) { |
| case CRITICAL: |
| if (useWaitCriterion) { |
| wc = new WaitCriterion() { |
| @Override |
| public String description() { |
| return "Remote CRITICAL assert failed " + listener.toString(); |
| } |
| |
| @Override |
| public boolean done() { |
| return value == listener.getCriticalThresholdCalls(); |
| } |
| }; |
| } else { |
| assertEquals(value, listener.getCriticalThresholdCalls()); |
| } |
| break; |
| case CRITICAL_DISABLED: |
| if (useWaitCriterion) { |
| wc = new WaitCriterion() { |
| @Override |
| public String description() { |
| return "Remote CRITICAL_DISABLED assert failed " + listener.toString(); |
| } |
| |
| @Override |
| public boolean done() { |
| return value == listener.getCriticalDisabledCalls(); |
| } |
| }; |
| } else { |
| assertEquals(value, listener.getCriticalDisabledCalls()); |
| } |
| break; |
| case EVICTION: |
| if (useWaitCriterion) { |
| wc = new WaitCriterion() { |
| @Override |
| public String description() { |
| return "Remote EVICTION assert failed " + listener.toString(); |
| } |
| |
| @Override |
| public boolean done() { |
| return value == listener.getEvictionThresholdCalls(); |
| } |
| }; |
| } else { |
| assertEquals(value, listener.getEvictionThresholdCalls()); |
| } |
| break; |
| case EVICTION_DISABLED: |
| if (useWaitCriterion) { |
| wc = new WaitCriterion() { |
| @Override |
| public String description() { |
| return "Remote EVICTION_DISABLED assert failed " + listener.toString(); |
| } |
| |
| @Override |
| public boolean done() { |
| return value == listener.getEvictionDisabledCalls(); |
| } |
| }; |
| } else { |
| assertEquals(value, listener.getEvictionDisabledCalls()); |
| } |
| break; |
| case NORMAL: |
| if (useWaitCriterion) { |
| wc = new WaitCriterion() { |
| @Override |
| public String description() { |
| return "Remote NORMAL assert failed " + listener.toString(); |
| } |
| |
| @Override |
| public boolean done() { |
| return value == listener.getNormalCalls(); |
| } |
| }; |
| } else { |
| assertEquals(value, listener.getNormalCalls()); |
| } |
| break; |
| default: |
| throw new IllegalStateException("Unknown memory state"); |
| } |
| if (useWaitCriterion) { |
| waitForCriterion(wc, 5000, 100, true); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| private void verifyProfiles(VM vm, final int numberOfProfiles) { |
| vm.invoke(new SerializableCallable() { |
| public Object call() throws Exception { |
| InternalResourceManager irm = ((GemFireCacheImpl)getCache()).getResourceManager(); |
| final ResourceAdvisor ra = irm.getResourceAdvisor(); |
| WaitCriterion wc = new WaitCriterion() { |
| public String description() { |
| return "verify profiles failed. Current profiles: " + ra.adviseGeneric(); |
| } |
| public boolean done() { |
| return numberOfProfiles == ra.adviseGeneric().size(); |
| } |
| }; |
| waitForCriterion(wc, 10000, 10, true); |
| return null; |
| } |
| }); |
| } |
| |
| private Properties getServerProperties(int mcastPort) { |
| Properties p = new Properties(); |
| p.setProperty(DistributionConfig.MCAST_PORT_NAME, mcastPort + ""); |
| p.setProperty(DistributionConfig.MCAST_TTL_NAME, "0"); |
| p.setProperty(DistributionConfig.LOCATORS_NAME, ""); |
| p.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, "1m"); |
| return p; |
| } |
| |
| protected Properties getClientProps() { |
| Properties p = new Properties(); |
| p.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); |
| p.setProperty(DistributionConfig.LOCATORS_NAME, ""); |
| return p; |
| } |
| } |