blob: fc9f565535b98e0b379c6e91af82f6e012fe9dc0 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.pdx;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.DistributionMessageObserver;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.cache.DistributedCacheOperation;
import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.RegionEntry;
import com.gemstone.gemfire.internal.cache.VMCachedDeserializable;
import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.command.Put70;
import com.gemstone.gemfire.internal.cache.versions.VMVersionTag;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* @author dsmith
*
*/
public class ClientsWithVersioningRetryDUnitTest extends CacheTestCase {
// list of expected exceptions to remove in tearDown2()
static List<ExpectedException> expectedExceptions = new LinkedList<ExpectedException>();
public ClientsWithVersioningRetryDUnitTest(String name) {
super(name);
}
@Override
public void setUp() throws Exception {
super.setUp();
invokeInEveryVM(new SerializableRunnable() {
@Override
public void run() {
//Disable endpoint shuffling, so that the client will always connect
//to the first server we give it.
System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
}
});
}
@Override
public void tearDown2() throws Exception {
super.tearDown2();
invokeInEveryVM(new SerializableRunnable() {
@Override public void run() {
System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "false");
}
});
for (ExpectedException ex: expectedExceptions) {
ex.remove();
}
}
/**
* Test that we can successfully retry a distributed put all and get
* the version information.
* second failure in bug 44951
*/
public void testRetryPut() {
Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
createServerRegion(vm0, RegionShortcut.REPLICATE);
createServerRegion(vm1, RegionShortcut.REPLICATE);
// create an event tag in vm0 and then replay that event in vm1
final DistributedMember memberID = (DistributedMember)vm0.invoke(new SerializableCallable("get id") {
public Object call() {
return ((DistributedRegion)getCache().getRegion("region")).getDistributionManager().getDistributionManagerId();
}
});
vm0.invoke(new SerializableCallable("create entry with fake event ID") {
@Override
public Object call() {
DistributedRegion dr = (DistributedRegion)getCache().getRegion("region");
VersionTag tag = new VMVersionTag();
tag.setMemberID(dr.getVersionMember());
tag.setRegionVersion(123);
tag.setEntryVersion(9);
tag.setVersionTimeStamp(System.currentTimeMillis());
EventID eventID = new EventID(new byte[0], 1, 0);
EntryEventImpl event = EntryEventImpl.create(dr, Operation.CREATE, "TestObject", "TestValue", null,
false, memberID, true, eventID);
event.setVersionTag(tag);
event.setContext(new ClientProxyMembershipID(memberID));
dr.recordEvent(event);
event.release();
return memberID;
}
});
vm1.invoke(new SerializableRunnable("recover event tag in vm1 from vm0") {
@Override
public void run() {
DistributedRegion dr = (DistributedRegion)getCache().getRegion("region");
EventID eventID = new EventID(new byte[0], 1, 0);
EntryEventImpl event = EntryEventImpl.create(dr, Operation.CREATE, "TestObject", "TestValue", null,
false, memberID, true, eventID);
try {
event.setContext(new ClientProxyMembershipID(memberID));
boolean recovered = ((BaseCommand)Put70.getCommand()).recoverVersionTagForRetriedOperation(event);
assertTrue("Expected to recover the version for this event ID", recovered);
assertEquals("Expected the region version to be 123", 123, event.getVersionTag().getRegionVersion());
} finally {
event.release();
}
}
});
// bug #48205 - a retried op in PR nodes not owning the primary bucket
// may already have a version assigned to it in another backup bucket
vm1.invoke(new SerializableRunnable("recover posdup event tag in vm1 event tracker from vm0") {
@Override
public void run() {
DistributedRegion dr = (DistributedRegion)getCache().getRegion("region");
EventID eventID = new EventID(new byte[0], 1, 0);
EntryEventImpl event = EntryEventImpl.create(dr, Operation.CREATE, "TestObject", "TestValue", null,
false, memberID, true, eventID);
event.setPossibleDuplicate(true);
try {
dr.hasSeenEvent(event);
assertTrue("Expected to recover the version for the event ID", event.getVersionTag() != null);
} finally {
event.release();
}
}
});
}
/**
* Test that we can successfully retry a distributed put all and get
* the version information.
* bug #45059
*/
public void testRetryPutAll() {
Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
final VM vm2 = host.getVM(2);
final VM vm3 = host.getVM(3);
createServerRegion(vm0, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
vm0.invoke(new SerializableRunnable() {
@Override
public void run() {
//Make sure the bucket 0 is primary in this member.
Region region = getCache().getRegion("region");
region.put(0, "value");
//Add a listener to close vm1 when we send a distributed put all operation
//this will cause a retry after we have applied the original put all to
//the cache, causing a retry
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeSendMessage(DistributionManager dm,
DistributionMessage msg) {
if(msg instanceof DistributedPutAllOperation.PutAllMessage) {
DistributionMessageObserver.setInstance(null);
disconnectFromDS(vm1);
}
}
});
}
});
int port1 = createServerRegion(vm1, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
int port2 = createServerRegion(vm2, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
createClientRegion(vm3, port1, port2);
//This will be a put all to bucket 0
//Here's the expected sequence
//client->vm1 (accessor0)
//vm1->vm0
//vm0 will kill vm1
//vm0->vm2
//client will retry the putall
vm3.invoke(new SerializableCallable() {
public Object call() throws Exception {
Region region = getCache().getRegion("region");
Map map = new HashMap();
map.put(0, "a");
map.put(113, "b");
region.putAll(map);
RegionEntry entry = ((LocalRegion)region).getRegionEntry(0);
assertNotNull(entry);
assertNotNull(entry.getVersionStamp());
assertEquals(2, entry.getVersionStamp().getEntryVersion());
return null;
}
});
//Verify the observer was triggered
vm0.invoke(new SerializableRunnable() {
@Override
public void run() {
//if the observer was triggered, it would have cleared itself
assertNull(DistributionMessageObserver.getInstance());
}
});
//Make sure vm1 did in fact shut down
vm1.invoke(new SerializableRunnable() {
@Override
public void run() {
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
assertTrue(cache == null || cache.isClosed());
}
});
}
/**
* Test that we can successfully retry a distributed putAll on an accessor
* and get the version information.
* bug #48205
*/
public void testRetryPutAllInAccessor() {
Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
final VM vm2 = host.getVM(2);
final VM vm3 = host.getVM(3);
getLogWriter().info("creating region in vm0");
createRegionInPeer(vm0, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
vm0.invoke(new SerializableRunnable() {
@Override
public void run() {
//Make sure the bucket 0 is primary in this member.
Region region = getCache().getRegion("region");
region.put(0, "value");
}
});
getLogWriter().info("creating region in vm1");
createRegionInPeer(vm1, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
getLogWriter().info("creating region in vm2");
createRegionInPeer(vm2, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
getLogWriter().info("creating region in vm3");
createRegionInPeer(vm3, RegionShortcut.PARTITION_PROXY);
expectedExceptions.add(addExpectedException("RuntimeException", vm2));
vm2.invoke(new SerializableRunnable("install message listener to ignore update") {
public void run() {
//Add a listener to close vm2 when we send a distributed put all operation
//this will cause a retry after we have applied the original put all to
//the cache, causing a retry
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeProcessMessage(DistributionManager dm,
DistributionMessage msg) {
if(msg instanceof DistributedPutAllOperation.PutAllMessage) {
DistributionMessageObserver.setInstance(null);
pause(5000); // give vm1 time to process the message that we're ignoring
disconnectFromDS(vm0);
// no reply will be sent to vm0 due to this exception, but that's okay
// because vm0 has been shut down
throw new RuntimeException("test code is ignoring message: " + msg);
}
}
});
}
});
//This will be a put all to bucket 0
//Here's the expected sequence
//accessor->vm0 (primary)
//vm0->vm1, vm2
//vm2 will ignore the message & kill vm0
//accessor->vm2 or vm1
// version tag is recovered and put in the event & cache
vm3.invoke(new SerializableCallable("perform putAll in accessor") {
public Object call() throws Exception {
Region region = getCache().getRegion("region");
Map map = new HashMap();
map.put(0, "a");
map.put(113, "b");
region.putAll(map);
return null;
}
});
// verify that the version is correct
vm1.invoke(new SerializableRunnable("verify vm1") {
@Override
public void run() {
//if the observer was triggered, it would have cleared itself
assertNull(DistributionMessageObserver.getInstance());
Region region = getCache().getRegion("region");
VersionTag tag = ((LocalRegion)region).getVersionTag(0);
assertEquals(2, tag.getEntryVersion());
}
});
//Verify the observer was triggered and the version is correct
vm2.invoke(new SerializableRunnable("verify vm2") {
@Override
public void run() {
//if the observer was triggered, it would have cleared itself
assertNull(DistributionMessageObserver.getInstance());
Region region = getCache().getRegion("region");
VersionTag tag = ((LocalRegion)region).getVersionTag(0);
assertEquals(2, tag.getEntryVersion());
}
});
//Make sure vm1 did in fact shut down
vm0.invoke(new SerializableRunnable() {
@Override
public void run() {
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
assertTrue(cache == null || cache.isClosed());
}
});
}
private void disconnectFromDS(VM vm) {
vm.invoke(new SerializableCallable("disconnecting vm " + vm) {
public Object call() throws Exception {
disconnectFromDS();
return null;
}
});
}
private int createServerRegion(VM vm, final RegionShortcut shortcut) {
SerializableCallable createRegion = new SerializableCallable("create server region") {
public Object call() throws Exception {
RegionFactory<Object, Object> rf = getCache().createRegionFactory(shortcut);
if (!shortcut.equals(RegionShortcut.REPLICATE)) {
rf.setPartitionAttributes(new PartitionAttributesFactory().setRedundantCopies(2).create());
}
rf.create("region");
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailableTCPPort();
server.setPort(port);
server.start();
return port;
}
};
return (Integer) vm.invoke(createRegion);
}
private void createRegionInPeer(VM vm, final RegionShortcut shortcut) {
SerializableCallable createRegion = new SerializableCallable("create peer region") {
public Object call() throws Exception {
RegionFactory<Object, Object> rf = getCache().createRegionFactory(shortcut);
if (!shortcut.equals(RegionShortcut.REPLICATE)) {
rf.setPartitionAttributes(new PartitionAttributesFactory().setRedundantCopies(2).create());
}
rf.create("region");
return null;
}
};
vm.invoke(createRegion);
}
public Properties getDistributedSystemProperties() {
Properties p = super.getDistributedSystemProperties();
p.put("conserve-sockets", "false");
return p;
}
private int createServerRegionWithPersistence(VM vm,
final boolean persistentPdxRegistry) {
SerializableCallable createRegion = new SerializableCallable() {
public Object call() throws Exception {
CacheFactory cf = new CacheFactory();
if(persistentPdxRegistry) {
cf.setPdxPersistent(true)
.setPdxDiskStore("store");
}
//
Cache cache = getCache(cf);
cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs())
.create("store");
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setDiskStoreName("store");
createRootRegion("testSimplePdx", af.create());
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailableTCPPort();
server.setPort(port);
server.start();
return port;
}
};
return (Integer) vm.invoke(createRegion);
}
private int createServerAccessor(VM vm) {
SerializableCallable createRegion = new SerializableCallable() {
public Object call() throws Exception {
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDataPolicy(DataPolicy.EMPTY);
createRootRegion("testSimplePdx", af.create());
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailableTCPPort();
server.setPort(port);
server.start();
return port;
}
};
return (Integer) vm.invoke(createRegion);
}
private void createClientRegion(final VM vm, final int port1, final int port2) {
createClientRegion(vm, port1, port2, false);
}
private void createClientRegion(final VM vm, final int port1, final int port2,
final boolean threadLocalConnections) {
SerializableCallable createRegion = new SerializableCallable("create client region in " + vm) {
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(getServerHostName(vm.getHost()), port1);
cf.addPoolServer(getServerHostName(vm.getHost()), port2);
cf.setPoolPRSingleHopEnabled(false);
cf.setPoolThreadLocalConnections(threadLocalConnections);
cf.setPoolReadTimeout(10 * 60 * 1000);
ClientCache cache = getClientCache(cf);
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
.create("region");
return null;
}
};
vm.invoke(createRegion);
}
}