blob: 17a9369a4bc8a085ac9b99cc93d7da5c8c109cb7 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache30;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Collection;
import java.util.HashMap;
import junit.framework.AssertionFailedError;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.InterestResultPolicy;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.RegionEntry;
import com.gemstone.gemfire.internal.cache.TombstoneService;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* concurrency-control tests for client/server
*
* @author bruce
*
*/
public class ClientServerCCEDUnitTest extends CacheTestCase {
public static LocalRegion TestRegion;
public void setup() {
// for bug #50683 we need a short queue-removal-message processing interval
HARegionQueue.setMessageSyncInterval(5);
}
public void tearDown2() {
disconnectAllFromDS();
HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
}
public ClientServerCCEDUnitTest(String name) {
super(name);
}
public void testClientServerRRTombstoneGC() {
clientServerTombstoneGCTest(getUniqueName(), true);
}
public void testClientServerPRTombstoneGC() {
clientServerTombstoneGCTest(getUniqueName(), false);
}
public void testPutAllInNonCCEClient() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final String name = this.getUniqueName() + "Region";
int port = createServerRegion(vm0, name, true);
createClientRegion(vm1, name, port, false);
doPutAllInClient(vm1);
}
/**
* test that distributed GC messages are sent to clients and properly processed
* @param replicatedRegion whether to use a RR or PR in the servers
*/
private void clientServerTombstoneGCTest(String uniqueName, boolean replicatedRegion) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final String name = uniqueName + "Region";
createServerRegion(vm0, name, replicatedRegion);
int port = createServerRegion(vm1, name, replicatedRegion);
createClientRegion(vm2, name, port, true);
createClientRegion(vm3, name, port, true);
createEntries(vm2);
destroyEntries(vm3);
unregisterInterest(vm3);
forceGC(vm0);
if (!replicatedRegion) {
//other bucket might be in vm1
forceGC(vm1);
}
checkClientReceivedGC(vm2);
checkClientDoesNotReceiveGC(vm3);
}
/**
* for bug #40791 we pull tombstones into clients on get(), getAll() and
* registerInterest() to protect the client cache from stray putAll
* events sitting in backup queues on the server
*/
public void testClientRIGetsTombstonesRR() throws Exception {
clientRIGetsTombstoneTest(getUniqueName(),true);
}
public void testClientRIGetsTombstonesPR() throws Exception {
clientRIGetsTombstoneTest(getUniqueName(),false);
}
/**
* test that clients receive tombstones in register-interest results
*/
private void clientRIGetsTombstoneTest(String uniqueName, boolean replicatedRegion) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String name = uniqueName + "Region";
createServerRegion(vm0, name, replicatedRegion);
int port = createServerRegion(vm1, name, replicatedRegion);
createEntries(vm0);
destroyEntries(vm0);
getLogWriter().info("***************** register interest on all keys");
createClientRegion(vm2, name, port, true);
registerInterest(vm2);
ensureAllTombstonesPresent(vm2);
getLogWriter().info("***************** clear cache and register interest on one key, Object0");
clearLocalCache(vm2);
registerInterestOneKey(vm2, "Object0");
List<String> keys = new ArrayList(1);
keys.add("Object0");
ensureAllTombstonesPresent(vm2, keys);
getLogWriter().info("***************** clear cache and register interest on four keys");
clearLocalCache(vm2);
keys = new ArrayList(4);
for (int i=0; i<4; i++) {
keys.add("Object"+i);
}
registerInterest(vm2, keys);
ensureAllTombstonesPresent(vm2, keys);
getLogWriter().info("***************** clear cache and register interest with regex on four keys");
clearLocalCache(vm2);
registerInterestRegex(vm2, "Object[0-3]");
ensureAllTombstonesPresent(vm2, keys);
getLogWriter().info("***************** fetch entries with getAll()");
clearLocalCache(vm2);
getAll(vm2);
ensureAllTombstonesPresent(vm2);
}
public void testClientRIGetsInvalidEntriesRR() throws Exception {
clientRIGetsInvalidEntriesTest(getUniqueName(),true);
}
public void testClientRIGetsInvalidEntriesPR() throws Exception {
clientRIGetsInvalidEntriesTest(getUniqueName(),false);
}
private void clientRIGetsInvalidEntriesTest(String uniqueName, boolean replicatedRegion) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String name = uniqueName + "Region";
createServerRegion(vm0, name, replicatedRegion);
int port = createServerRegion(vm1, name, replicatedRegion);
createEntries(vm0);
invalidateEntries(vm0);
getLogWriter().info("***************** register interest on all keys");
createClientRegion(vm2, name, port, true);
registerInterest(vm2);
ensureAllInvalidsPresent(vm2);
getLogWriter().info("***************** clear cache and register interest on one key, Object0");
clearLocalCache(vm2);
registerInterestOneKey(vm2, "Object0");
List<String> keys = new ArrayList(1);
keys.add("Object0");
ensureAllInvalidsPresent(vm2, keys);
getLogWriter().info("***************** clear cache and register interest on four keys");
clearLocalCache(vm2);
keys = new ArrayList(4);
for (int i=0; i<4; i++) {
keys.add("Object"+i);
}
registerInterest(vm2, keys);
ensureAllInvalidsPresent(vm2, keys);
getLogWriter().info("***************** clear cache and register interest with regex on four keys");
clearLocalCache(vm2);
registerInterestRegex(vm2, "Object[0-3]");
ensureAllInvalidsPresent(vm2, keys);
getLogWriter().info("***************** fetch entries with getAll()");
clearLocalCache(vm2);
getAll(vm2);
ensureAllInvalidsPresent(vm2);
}
private void registerInterest(VM vm) {
vm.invoke(new SerializableRunnable("register interest in all keys") {
public void run() {
TestRegion.registerInterestRegex(".*");
}
});
}
private void unregisterInterest(VM vm) {
vm.invoke(new SerializableRunnable("unregister interest in all keys") {
public void run() {
// TestRegion.dumpBackingMap();
TestRegion.unregisterInterestRegex(".*");
// TestRegion.dumpBackingMap();
}
});
}
private void registerInterest(VM vm, final List keys) {
vm.invoke(new SerializableRunnable("register interest in key list") {
public void run() {
TestRegion.registerInterest(keys);
}
});
}
private void registerInterestOneKey(VM vm, final String key) {
vm.invoke(new SerializableRunnable("register interest in " + key) {
public void run() {
TestRegion.registerInterest(key);
}
});
}
private void registerInterestRegex(VM vm, final String pattern) {
vm.invoke(new SerializableRunnable("register interest in key list") {
public void run() {
TestRegion.registerInterestRegex(pattern);
}
});
}
private void ensureAllTombstonesPresent(VM vm) {
vm.invoke(new SerializableCallable("check all are tombstones") {
public Object call() {
for (int i=0; i<10; i++) {
assertTrue("expected a tombstone for Object"+i, TestRegion.containsTombstone("Object"+i));
}
return null;
}
});
}
private void ensureAllTombstonesPresent(VM vm, final List keys) {
vm.invoke(new SerializableCallable("check tombstones in list") {
public Object call() {
for (Object key: keys) {
assertTrue("expected to find a tombstone for "+key, TestRegion.containsTombstone(key));
}
return null;
}
});
}
private void ensureAllInvalidsPresent(VM vm) {
vm.invoke(new SerializableCallable("check all are tombstones") {
public Object call() {
for (int i=0; i<10; i++) {
assertTrue("expected to find an entry for Object"+i, TestRegion.containsKey("Object"+i));
assertTrue("expected to find entry invalid for Object"+i, !TestRegion.containsValue("Object"+i));
}
return null;
}
});
}
private void ensureAllInvalidsPresent(VM vm, final List keys) {
vm.invoke(new SerializableCallable("check tombstones in list") {
public Object call() {
for (Object key: keys) {
assertTrue("expected to find an entry for "+key, TestRegion.containsKey(key));
assertTrue("expected to find entry invalid for "+key, !TestRegion.containsValue(key));
}
return null;
}
});
}
/* do a getAll of all keys */
private void getAll(VM vm) {
vm.invoke(new SerializableRunnable("getAll for all keys") {
public void run() {
Set<String> keys = new HashSet();
for (int i=0; i<10; i++) {
keys.add("Object"+i);
}
Map result = TestRegion.getAll(keys);
for (int i=0; i<10; i++) {
assertNull("expected no result for Object"+i, result.get("Object"+i));
}
}
});
}
/* this should remove all entries from the region, including tombstones */
private void clearLocalCache(VM vm) {
vm.invoke(new SerializableRunnable("clear local cache") {
public void run() {
TestRegion.localClear();
}
});
}
// private void closeCache(VM vm) {
public void testClientServerRRQueueCleanup() { // see bug #50879 if this fails
clientServerTombstoneMessageTest(true);
}
public void testClientServerPRQueueCleanup() { // see bug #50879 if this fails
clientServerTombstoneMessageTest(false);
}
/**
* test that distributed GC messages are properly cleaned out of durable
* client HA queues
*/
private void clientServerTombstoneMessageTest(boolean replicatedRegion) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final String name = this.getUniqueName() + "Region";
int port1 = createServerRegion(vm0, name, replicatedRegion);
int port2 = createServerRegion(vm1, name, replicatedRegion);
createDurableClientRegion(vm2, name, port1, port2, true);
createDurableClientRegion(vm3, name, port1, port2, true);
createEntries(vm2);
destroyEntries(vm3);
forceGC(vm0);
if (!replicatedRegion) {
//other bucket might be in vm1
forceGC(vm1);
}
pause(5000); // better chance that WaitCriteria will succeed 1st time if we pause a bit
checkClientReceivedGC(vm2);
checkClientReceivedGC(vm3);
checkServerQueuesEmpty(vm0);
checkServerQueuesEmpty(vm1);
}
// private void closeCache(VM vm) {
// vm.invoke(new SerializableCallable() {
// public Object call() throws Exception {
// closeCache();
// return null;
// }
// });
// }
private void createEntries(VM vm) {
vm.invoke(new SerializableCallable("create entries") {
public Object call() {
for (int i=0; i<10; i++) {
TestRegion.create("Object"+i, Integer.valueOf(i));
}
return null;
}
});
}
private void destroyEntries(VM vm) {
vm.invoke(new SerializableCallable("destroy entries") {
public Object call() {
for (int i=0; i<10; i++) {
TestRegion.destroy("Object"+i, Integer.valueOf(i));
}
assertEquals(0, TestRegion.size());
if (TestRegion.getDataPolicy().isReplicate()) {
assertEquals(10, TestRegion.getTombstoneCount());
}
return null;
}
});
}
private void doPutAllInClient(VM vm) {
vm.invoke(new SerializableRunnable("do putAll") {
public void run() {
Map map = new HashMap();
for (int i=1000; i<1100; i++) {
map.put("object_"+i, i);
}
try {
TestRegion.putAll(map);
for (int i=1000; i<1100; i++) {
assertTrue("expected key object_"+i+" to be in the cache but it isn't", TestRegion.containsKey("object_"+i));
}
} catch (NullPointerException e) {
fail("caught NPE", e);
}
}
});
}
private void invalidateEntries(VM vm) {
vm.invoke(new SerializableCallable("invalidate entries") {
public Object call() {
for (int i=0; i<10; i++) {
TestRegion.invalidate("Object"+i, Integer.valueOf(i));
}
assertEquals(10, TestRegion.size());
return null;
}
});
}
private void forceGC(VM vm) {
vm.invoke(new SerializableCallable("force GC") {
public Object call() throws Exception {
TestRegion.getCache().getTombstoneService().forceBatchExpirationForTests(10);
return null;
}
});
}
private void checkClientReceivedGC(VM vm) {
vm.invoke(new SerializableCallable("check that GC happened") {
public Object call() throws Exception {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
getLogWriter().info("tombstone count = " + TestRegion.getTombstoneCount());
getLogWriter().info("region size = " + TestRegion.size());
return TestRegion.getTombstoneCount() == 0 && TestRegion.size() == 0;
}
@Override
public String description() {
return "waiting for garbage collection to occur";
}
};
waitForCriterion(wc, 60000, 2000, true);
return null;
}
});
}
private void checkServerQueuesEmpty(VM vm) {
vm.invoke(new SerializableCallable("check that client queues are properly cleared of old ClientTombstone messages") {
public Object call() throws Exception {
WaitCriterion wc = new WaitCriterion() {
// boolean firstTime = true;
@Override
public boolean done() {
CacheClientNotifier singleton = CacheClientNotifier.getInstance();
Collection<CacheClientProxy> proxies = singleton.getClientProxies();
// boolean first = firstTime;
// firstTime = false;
for (CacheClientProxy proxy: proxies) {
if (!proxy.isPrimary()) { // bug #50683 only applies to backup queues
int size = proxy.getQueueSize();
if (size > 0) {
// if (first) {
// ((LocalRegion)proxy.getHARegion()).dumpBackingMap();
// }
getLogWriter().info("queue size ("+size+") is still > 0 for " + proxy.getProxyID());
return false;
}
}
}
// also ensure that server regions have been cleaned up
int regionEntryCount = TestRegion.getRegionMap().size();
if (regionEntryCount > 0) {
getLogWriter().info("TestRegion has unexpected entries - all should have been GC'd but we have " + regionEntryCount);
TestRegion.dumpBackingMap();
return false;
}
return true;
}
@Override
public String description() {
return "waiting for queue removal messages to clear client queues";
}
};
waitForCriterion(wc, 60000, 2000, true);
return null;
}
});
}
private void checkClientDoesNotReceiveGC(VM vm) {
vm.invoke(new SerializableCallable("check that GC did not happen") {
public Object call() throws Exception {
if (TestRegion.getTombstoneCount() == 0) {
getLogWriter().warning("region has no tombstones");
// TestRegion.dumpBackingMap();
throw new AssertionFailedError("expected to find tombstones but region is empty");
}
return null;
}
});
}
private int createServerRegion(VM vm, final String regionName, final boolean replicatedRegion) {
SerializableCallable createRegion = new SerializableCallable() {
public Object call() throws Exception {
// TombstoneService.VERBOSE = true;
AttributesFactory af = new AttributesFactory();
if (replicatedRegion) {
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDataPolicy(DataPolicy.REPLICATE);
} else {
af.setDataPolicy(DataPolicy.PARTITION);
af.setPartitionAttributes((new PartitionAttributesFactory()).setTotalNumBuckets(2).create());
}
TestRegion = (LocalRegion)createRootRegion(regionName, af.create());
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailableTCPPort();
server.setPort(port);
server.start();
return port;
}
};
return (Integer) vm.invoke(createRegion);
}
private void createClientRegion(final VM vm, final String regionName, final int port, final boolean ccEnabled) {
SerializableCallable createRegion = new SerializableCallable() {
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(getServerHostName(vm.getHost()), port);
cf.setPoolSubscriptionEnabled(true);
cf.set("log-level", getDUnitLogLevel());
ClientCache cache = getClientCache(cf);
ClientRegionFactory crf = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
crf.setConcurrencyChecksEnabled(ccEnabled);
TestRegion = (LocalRegion)crf.create(regionName);
TestRegion.registerInterestRegex(".*", InterestResultPolicy.KEYS_VALUES, false, true);
return null;
}
};
vm.invoke(createRegion);
}
// For durable client QRM testing we need a backup queue (redundancy=1) and
// durable attributes. We also need to invoke readyForEvents()
private void createDurableClientRegion(final VM vm, final String regionName,
final int port1, final int port2, final boolean ccEnabled) {
SerializableCallable createRegion = new SerializableCallable() {
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(getServerHostName(vm.getHost()), port1);
cf.addPoolServer(getServerHostName(vm.getHost()), port2);
cf.setPoolSubscriptionEnabled(true);
cf.setPoolSubscriptionRedundancy(1);
// bug #50683 - secondary durable queue retains all GC messages
cf.set("durable-client-id", ""+vm.getPid());
cf.set("durable-client-timeout", "" + 200);
cf.set("log-level", "fine");
ClientCache cache = getClientCache(cf);
ClientRegionFactory crf = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
crf.setConcurrencyChecksEnabled(ccEnabled);
TestRegion = (LocalRegion)crf.create(regionName);
TestRegion.registerInterestRegex(".*", InterestResultPolicy.KEYS_VALUES, true, true);
cache.readyForEvents();
return null;
}
};
vm.invoke(createRegion);
}
}