| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache; |
| |
| import java.util.Properties; |
| |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.EvictionAction; |
| import com.gemstone.gemfire.cache.EvictionAttributes; |
| import com.gemstone.gemfire.cache.InterestPolicy; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionFactory; |
| import com.gemstone.gemfire.cache.RegionShortcut; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.cache.SubscriptionAttributes; |
| import com.gemstone.gemfire.cache30.CacheTestCase; |
| import com.gemstone.gemfire.distributed.internal.DistributionConfig; |
| import com.gemstone.gemfire.distributed.internal.DistributionManager; |
| import com.gemstone.gemfire.distributed.internal.DistributionMessage; |
| import com.gemstone.gemfire.distributed.internal.DistributionMessageObserver; |
| import com.gemstone.gemfire.internal.cache.SearchLoadAndWriteProcessor.NetSearchRequestMessage; |
| |
| import dunit.DistributedTestCase; |
| import dunit.Host; |
| import dunit.SerializableCallable; |
| import dunit.SerializableRunnable; |
| import dunit.VM; |
| |
| /** |
| * @author dsmith |
| * |
| */ |
| public class NetSearchMessagingDUnitTest extends CacheTestCase { |
| |
| /** |
| * @param name |
| */ |
| public NetSearchMessagingDUnitTest(String name) { |
| super(name); |
| } |
| |
| public void testOneMessageWithReplicates() { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| VM vm3 = host.getVM(3); |
| |
| createReplicate(vm0); |
| createReplicate(vm1); |
| createNormal(vm2); |
| createEmpty(vm3); |
| |
| //Test with a null value |
| { |
| long vm0Count = getReceivedMessages(vm0); |
| long vm1Count = getReceivedMessages(vm1); |
| long vm2Count = getReceivedMessages(vm2); |
| long vm3Count = getReceivedMessages(vm3); |
| |
| assertEquals(null, get(vm3, "a")); |
| |
| //Make sure we only processed one message |
| assertEquals(vm3Count + 1, getReceivedMessages(vm3)); |
| |
| //Make sure the replicates only saw one message between them |
| |
| assertEquals(vm0Count + vm1Count + 1, getReceivedMessages(vm0) + getReceivedMessages(vm1)); |
| |
| //Make sure the normal guy didn't see any messages |
| assertEquals(vm2Count, getReceivedMessages(vm2)); |
| } |
| |
| //Test with a real value value |
| { |
| |
| put(vm3, "a", "b"); |
| |
| long vm0Count = getReceivedMessages(vm0); |
| long vm1Count = getReceivedMessages(vm1); |
| long vm2Count = getReceivedMessages(vm2); |
| long vm3Count = getReceivedMessages(vm3); |
| |
| assertEquals("b", get(vm3, "a")); |
| |
| //Make sure we only processed one message |
| assertEquals(vm3Count + 1, getReceivedMessages(vm3)); |
| |
| //Make sure the replicates only saw one message between them |
| |
| assertEquals(vm0Count + vm1Count + 1, getReceivedMessages(vm0) + getReceivedMessages(vm1)); |
| |
| //Make sure the normal guy didn't see any messages |
| assertEquals(vm2Count, getReceivedMessages(vm2)); |
| } |
| |
| } |
| |
| |
| public void testNetSearchNormals() { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| final VM vm3 = host.getVM(3); |
| |
| createNormal(vm0); |
| createNormal(vm1); |
| createNormal(vm2); |
| createEmpty(vm3); |
| |
| //Test with a null value |
| { |
| long vm0Count = getReceivedMessages(vm0); |
| long vm1Count = getReceivedMessages(vm1); |
| long vm2Count = getReceivedMessages(vm2); |
| long vm3Count = getReceivedMessages(vm3); |
| |
| assertEquals(null, get(vm3, "a")); |
| |
| //Make sure we only processed one message |
| assertEquals(vm3Count + 3, getReceivedMessages(vm3)); |
| |
| //Make sure the normal guys each saw 1 query message. |
| assertEquals(vm0Count + vm1Count + vm2Count + 3, getReceivedMessages(vm0) + getReceivedMessages(vm1) + getReceivedMessages(vm2)); |
| } |
| |
| //Test with a real value value |
| { |
| |
| put(vm3, "a", "b"); |
| |
| long vm0Count = getReceivedMessages(vm0); |
| long vm1Count = getReceivedMessages(vm1); |
| long vm2Count = getReceivedMessages(vm2); |
| final long vm3Count = getReceivedMessages(vm3); |
| |
| assertEquals("b", get(vm3, "a")); |
| |
| //Make sure we only processed one message |
| assertEquals(vm2Count + 1, getReceivedMessages(vm2)); |
| |
| //Make sure we only processed one message |
| DistributedTestCase.waitForCriterion(new WaitCriterion() { |
| public boolean done() { |
| return getReceivedMessages(vm3) == vm3Count + 3; |
| } |
| public String description() { |
| return "expected " + (vm3Count+3) + " but was " + getReceivedMessages(vm3); |
| } |
| }, 2000, 100, true); |
| |
| //Make sure the normal guys each saw 1 query message. |
| assertEquals(vm0Count + vm1Count + vm2Count + 3, getReceivedMessages(vm0) + getReceivedMessages(vm1) + getReceivedMessages(vm2)); |
| } |
| |
| } |
| |
| /** |
| * In bug #48186 a deadlock occurs when a netsearch pulls in a value from |
| * the disk and causes a LRU eviction of another entry. Here we merely |
| * demonstrate that a netsearch that gets the value of an overflow entry |
| * does not update the LRU status of that entry. |
| */ |
| public void testNetSearchNoLRU() { |
| Host host = Host.getHost(0); |
| VM vm2 = host.getVM(2); |
| VM vm1 = host.getVM(1); |
| |
| createOverflow(vm2, 5); |
| createEmpty(vm1); |
| |
| |
| |
| //Test with a null value |
| { |
| put(vm2, "a", "1"); |
| put(vm2, "b", "2"); |
| put(vm2, "c", "3"); |
| put(vm2, "d", "4"); |
| put(vm2, "e", "5"); |
| // the cache in vm0 is now full and LRU will occur on this next put() |
| put(vm2, "f", "6"); |
| |
| SerializableCallable verifyEvicted = new SerializableCallable("verify eviction of 'a'") { |
| public Object call() { |
| Cache cache = getCache(); |
| LocalRegion region = (LocalRegion)cache.getRegion("region"); |
| RegionEntry re = region.getRegionEntry("a"); |
| Object o = re.getValueInVM(null); |
| getLogWriter().info("key a="+o);; |
| return o == null || o == Token.NOT_AVAILABLE; |
| } |
| }; |
| |
| boolean evicted = (Boolean)vm2.invoke(verifyEvicted); |
| assertTrue("expected 'a' to be evicted", evicted); |
| |
| // now netsearch for 'a' from the other VM and verify again |
| Object value = get(vm1, "a"); |
| assertEquals("expected to find '1' result from netSearch", "1", value); |
| |
| evicted = (Boolean)vm2.invoke(verifyEvicted); |
| assertTrue("expected 'a' to still be evicted", evicted); |
| vm2.invoke(new SerializableRunnable("verify other entries are not evicted") { |
| public void run() { |
| Cache cache = getCache(); |
| LocalRegion region = (LocalRegion)cache.getRegion("region"); |
| String[] keys = new String[]{"b", "c", "d", "e", "f"}; |
| for (String key: keys) { |
| RegionEntry re = region.getRegionEntry(key); |
| Object o = re.getValueInVM(null); |
| getLogWriter().info("key " + key + "=" + o); |
| assertTrue("expected key " + key + " to not be evicted", |
| (o != null) && (o != Token.NOT_AVAILABLE)); |
| } |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Make sure that even if we start out by net searching replicates, |
| * we'll fall back to net searching normal members. |
| */ |
| public void testNetSearchFailoverFromReplicate() { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| VM vm3 = host.getVM(3); |
| |
| //Install a listener to kill this member |
| //when we get the netsearch request |
| vm0.invoke(new SerializableRunnable("Install listener") { |
| |
| public void run() { |
| DistributionMessageObserver ob = new DistributionMessageObserver() { |
| public void beforeProcessMessage(DistributionManager dm, |
| DistributionMessage message) { |
| if(message instanceof NetSearchRequestMessage) { |
| disconnectFromDS(); |
| } |
| } |
| }; |
| DistributionMessageObserver.setInstance(ob); |
| } |
| }); |
| |
| createReplicate(vm0); |
| createNormal(vm1); |
| createNormal(vm2); |
| createEmpty(vm3); |
| |
| //Test with a real value value |
| { |
| put(vm3, "a", "b"); |
| |
| long vm0Count = getReceivedMessages(vm0); |
| long vm1Count = getReceivedMessages(vm1); |
| long vm2Count = getReceivedMessages(vm2); |
| long vm3Count = getReceivedMessages(vm3); |
| |
| assertEquals("b", get(vm3, "a")); |
| |
| //Make sure we were disconnected in vm0 |
| vm0.invoke(new SerializableRunnable("check disconnected") { |
| |
| public void run() { |
| assertNull(GemFireCacheImpl.getInstance()); |
| } |
| }); |
| } |
| |
| } |
| |
| public void testNetSearchFailoverFromOneReplicateToAnother() { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| VM vm3 = host.getVM(3); |
| |
| //Install a listener to kill this member |
| //when we get the netsearch request |
| vm0.invoke(new SerializableRunnable("Install listener") { |
| |
| public void run() { |
| DistributionMessageObserver ob = new DistributionMessageObserver() { |
| public void beforeProcessMessage(DistributionManager dm, |
| DistributionMessage message) { |
| if(message instanceof NetSearchRequestMessage) { |
| disconnectFromDS(); |
| } |
| } |
| }; |
| DistributionMessageObserver.setInstance(ob); |
| } |
| }); |
| |
| createReplicate(vm0); |
| createReplicate(vm1); |
| createEmpty(vm3); |
| |
| //Test with a real value value |
| { |
| put(vm3, "a", "b"); |
| |
| boolean disconnected = false; |
| while(!disconnected) { |
| assertEquals("b", get(vm3, "a")); |
| |
| //Make sure we were disconnected in vm0 |
| disconnected = (Boolean) vm0.invoke(new SerializableCallable("check disconnected") { |
| |
| public Object call() { |
| return GemFireCacheImpl.getInstance() == null; |
| } |
| }); |
| } |
| } |
| |
| } |
| |
| private Object put(VM vm, final String key, final String value) { |
| return vm.invoke(new SerializableCallable() { |
| |
| public Object call() { |
| Cache cache = getCache(); |
| Region region = cache.getRegion("region"); |
| getLogWriter().info("putting key="+key+"="+value); |
| Object result = region.put(key, value); |
| getLogWriter().info("done putting key="+key); |
| return result; |
| } |
| }); |
| } |
| |
| private Object get(VM vm, final Object key) { |
| return vm.invoke(new SerializableCallable("get " + key) { |
| |
| public Object call() { |
| Cache cache = getCache(); |
| Region region = cache.getRegion("region"); |
| return region.get(key); |
| } |
| }); |
| } |
| |
| private long getReceivedMessages(VM vm) { |
| return ((Long) vm.invoke(new SerializableCallable() { |
| |
| public Object call() { |
| GemFireCacheImpl cache = (GemFireCacheImpl) getCache(); |
| return cache.getDistributedSystem().getDMStats().getReceivedMessages(); |
| } |
| })).intValue(); |
| } |
| |
| private void createEmpty(VM vm) { |
| vm.invoke(new SerializableRunnable() { |
| |
| public void run() { |
| Cache cache = getCache(); |
| RegionFactory rf = new RegionFactory(); |
| rf.setScope(Scope.DISTRIBUTED_ACK); |
| rf.setConcurrencyChecksEnabled(false); |
| rf.setDataPolicy(DataPolicy.EMPTY); |
| rf.create("region"); |
| } |
| }); |
| |
| } |
| |
| private void createNormal(VM vm) { |
| vm.invoke(new SerializableRunnable() { |
| |
| public void run() { |
| Cache cache = getCache(); |
| RegionFactory rf = new RegionFactory(); |
| rf.setScope(Scope.DISTRIBUTED_ACK); |
| rf.setConcurrencyChecksEnabled(false); |
| rf.setDataPolicy(DataPolicy.NORMAL); |
| rf.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)); |
| rf.create("region"); |
| } |
| }); |
| |
| } |
| |
| private void createOverflow(VM vm, final int count) { |
| vm.invoke(new SerializableRunnable() { |
| |
| public void run() { |
| Cache cache = getCache(); |
| RegionFactory rf = cache.createRegionFactory(RegionShortcut.REPLICATE); |
| rf.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(count, EvictionAction.OVERFLOW_TO_DISK)); |
| rf.create("region"); |
| } |
| }); |
| |
| } |
| |
| private void createReplicate(VM vm) { |
| vm.invoke(new SerializableRunnable() { |
| |
| public void run() { |
| Cache cache = getCache(); |
| RegionFactory rf = new RegionFactory(); |
| rf.setScope(Scope.DISTRIBUTED_ACK); |
| rf.setConcurrencyChecksEnabled(false); |
| rf.setDataPolicy(DataPolicy.REPLICATE); |
| rf.create("region"); |
| } |
| }); |
| |
| } |
| |
| } |