blob: 342694f8eef991f66ac4867ce0327b350c3bc32a [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache30;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.ForcedDisconnectException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.LossAction;
import com.gemstone.gemfire.cache.MembershipAttributes;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.RegionExistsException;
import com.gemstone.gemfire.cache.ResumptionAction;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.TimeoutException;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem.ReconnectListener;
import com.gemstone.gemfire.distributed.internal.InternalLocator;
import com.gemstone.gemfire.distributed.internal.membership.jgroup.MembershipManagerHelper;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.OSProcess;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.xmlcache.CacheXmlGenerator;
import com.gemstone.org.jgroups.Event;
import com.gemstone.org.jgroups.JChannel;
import com.gemstone.org.jgroups.protocols.pbcast.GMS;
import com.gemstone.org.jgroups.stack.Protocol;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
public class ReconnectDUnitTest extends CacheTestCase
{
static int locatorPort;
static Locator locator;
static DistributedSystem savedSystem;
static int locatorVMNumber = 3;
Properties dsProperties;
public ReconnectDUnitTest(String name) {
super(name);
}
@Override
public void setUp() throws Exception {
super.setUp();
this.locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
final int locPort = this.locatorPort;
Host.getHost(0).getVM(locatorVMNumber)
.invoke(new SerializableRunnable("start locator") {
public void run() {
try {
InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
if (ds != null) {
ds.disconnect();
}
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put("log-file", "autoReconnectLocatorVM"+VM.getCurrentVMNum()+"_"+getPID()+".log");
locator = Locator.startLocatorAndDS(locatorPort, null, props);
addExpectedException("com.gemstone.gemfire.ForcedDisconnectException||Possible loss of quorum");
// MembershipManagerHelper.getMembershipManager(InternalDistributedSystem.getConnectedInstance()).setDebugJGroups(true);
} catch (IOException e) {
fail("unable to start locator", e);
}
}
});
beginCacheXml();
createRegion("myRegion", createAtts());
finishCacheXml("MyDisconnect");
//Cache cache = getCache();
closeCache();
getSystem().disconnect();
getLogWriter().fine("Cache Closed ");
}
@Override
public Properties getDistributedSystemProperties() {
if (dsProperties == null) {
dsProperties = super.getDistributedSystemProperties();
dsProperties.put(DistributionConfig.MAX_WAIT_TIME_FOR_RECONNECT_NAME, "20000");
dsProperties.put(DistributionConfig.ENABLE_NETWORK_PARTITION_DETECTION_NAME, "true");
dsProperties.put(DistributionConfig.DISABLE_AUTO_RECONNECT_NAME, "false");
dsProperties.put(DistributionConfig.LOCATORS_NAME, "localHost["+this.locatorPort+"]");
dsProperties.put(DistributionConfig.MCAST_PORT_NAME, "0");
dsProperties.put(DistributionConfig.MEMBER_TIMEOUT_NAME, "1000");
dsProperties.put(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
}
return dsProperties;
}
public void tearDown2() throws Exception
{
try {
super.tearDown2();
Host.getHost(0).getVM(3).invoke(new SerializableRunnable("stop locator") {
public void run() {
if (locator != null) {
getLogWriter().info("stopping locator " + locator);
locator.stop();
}
}
});
} finally {
invokeInEveryVM(new SerializableRunnable() {
public void run() {
ReconnectDUnitTest.savedSystem = null;
}
});
disconnectAllFromDS();
}
}
/**
* Creates some region attributes for the regions being created.
* */
private RegionAttributes createAtts()
{
AttributesFactory factory = new AttributesFactory();
{
// TestCacheListener listener = new TestCacheListener(){}; // this needs to be serializable
//callbacks.add(listener);
//factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setScope(Scope.DISTRIBUTED_ACK);
// factory.setCacheListener(listener);
}
return factory.create();
}
/**
* (comment from Bruce: this test doesn't seem to really do anything)
* </p>
* Test reconnect with the max-time-out of 200 and max-number-of-tries
* 1. The test first creates an xml file and then use it to create
* cache and regions. The test then fires reconnect in one of the
* vms. The reconnect uses xml file to create and intialize cache.
* @throws Exception
* */
public void testReconnect() throws TimeoutException, CacheException,
IOException
{
final int locPort = this.locatorPort;
final String xmlFileLoc = (new File(".")).getAbsolutePath();
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
//VM vm2 = host.getVM(2);
SerializableRunnable create1 = new CacheSerializableRunnable(
"Create Cache and Regions from cache.xml") {
public void run2() throws CacheException
{
// DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml");
props.put("max-wait-time-reconnect", "200");
props.put("max-num-reconnect-tries", "1");
getLogWriter().info("test is creating distributed system");
getSystem(props);
getLogWriter().info("test is creating cache");
Cache cache = getCache();
Region myRegion = cache.getRegion("root/myRegion");
myRegion.put("MyKey1", "MyValue1");
// myRegion.put("Mykey2", "MyValue2");
}
};
SerializableRunnable create2 = new CacheSerializableRunnable(
"Create Cache and Regions from cache.xml") {
public void run2() throws CacheException
{
// DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml");
props.put("max-wait-time-reconnect", "200");
props.put("max-num-reconnect-tries", "1");
getSystem(props);
Cache cache = getCache();
Region myRegion = cache.getRegion("root/myRegion");
//myRegion.put("MyKey1", "MyValue1");
myRegion.put("Mykey2", "MyValue2");
assertNotNull(myRegion.get("MyKey1"));
//getLogWriter().fine("MyKey1 value is : "+myRegion.get("MyKey1"));
}
};
vm0.invoke(create1);
vm1.invoke(create2);
SerializableRunnable reconnect = new CacheSerializableRunnable(
"Create Region") {
public void run2() throws CacheException
{
// DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
// closeCache();
// getSystem().disconnect();
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml");
props.put("max-wait-time-reconnect", "200");
props.put("max-num-reconnect-tries", "1");
getSystem(props);
Cache cache = getCache();
//getLogWriter().fine("Cache type : "+cache.getClass().getName());
Region reg = cache.getRegion("root/myRegion");
//getLogWriter().fine("The reg type : "+reg);
assertNotNull(reg.get("MyKey1"));
getLogWriter().fine("MyKey1 Value after disconnect : "
+ reg.get("MyKey1"));
//closeCache();
//disconnectFromDS();
}
};
vm1.invoke(reconnect);
}
// quorum check fails, then succeeds
public void testReconnectWithQuorum() throws Exception {
addExpectedException("killing member's ds");
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final int locPort = locatorPort;
final String xmlFileLoc = (new File(".")).getAbsolutePath();
// disable disconnects in the locator so we have some stability
host.getVM(locatorVMNumber).invoke(new SerializableRunnable("disable force-disconnect") {
public void run() {
GMS gms = (GMS)MembershipManagerHelper.getJChannel(InternalDistributedSystem.getConnectedInstance())
.getProtocolStack().findProtocol("GMS");
gms.disableDisconnectOnQuorumLossForTesting();
}}
);
SerializableCallable create = new SerializableCallable(
"Create Cache and Regions from cache.xml") {
public Object call() throws CacheException
{
// DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml");
props.put("max-num-reconnect-tries", "2");
props.put("log-file", "autoReconnectVM"+VM.getCurrentVMNum()+"_"+getPID()+".log");
Cache cache = new CacheFactory(props).create();
addExpectedException("com.gemstone.gemfire.ForcedDisconnectException||Possible loss of quorum");
Region myRegion = cache.getRegion("root/myRegion");
ReconnectDUnitTest.savedSystem = cache.getDistributedSystem();
myRegion.put("MyKey1", "MyValue1");
// MembershipManagerHelper.getMembershipManager(cache.getDistributedSystem()).setDebugJGroups(true);
// myRegion.put("Mykey2", "MyValue2");
return savedSystem.getDistributedMember();
}
};
System.out.println("creating caches in vm0, vm1 and vm2");
vm0.invoke(create);
vm1.invoke(create);
vm2.invoke(create);
// view is [locator(3), vm0(15), vm1(10), vm2(10)]
/* now we want to cause vm0 and vm1 to force-disconnect. This may cause the other
* non-locator member to also disconnect, depending on the timing
*/
System.out.println("disconnecting vm0");
forceDisconnect(vm0);
pause(10000);
System.out.println("disconnecting vm1");
forceDisconnect(vm1);
/* now we wait for them to auto-reconnect */
try {
System.out.println("waiting for vm0 to reconnect");
waitForReconnect(vm0);
System.out.println("waiting for vm1 to reconnect");
waitForReconnect(vm1);
System.out.println("done reconnecting vm0 and vm1");
} catch (Exception e) {
dumpAllStacks();
throw e;
}
}
public void disabledtestReconnectOnForcedDisconnect() throws Exception {
doTestReconnectOnForcedDisconnect(false);
}
/** bug #51335 - customer is also trying to recreate the cache */
// this test is disabled due to a high failure rate during CI test runs.
// see bug #52160
public void disabledtestReconnectCollidesWithApplication() throws Exception {
doTestReconnectOnForcedDisconnect(true);
}
public void doTestReconnectOnForcedDisconnect(final boolean createInAppToo) throws Exception {
addExpectedException("killing member's ds");
// getSystem().disconnect();
// getLogWriter().fine("Cache Closed ");
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final int locPort = locatorPort;
final int secondLocPort = AvailablePortHelper.getRandomAvailableTCPPort();
deleteLocatorStateFile(locPort, secondLocPort);
final String xmlFileLoc = (new File(".")).getAbsolutePath();
SerializableCallable create1 = new SerializableCallable(
"Create Cache and Regions from cache.xml") {
public Object call() throws CacheException
{
// DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml");
props.put("max-wait-time-reconnect", "1000");
props.put("max-num-reconnect-tries", "2");
props.put("log-file", "autoReconnectVM"+VM.getCurrentVMNum()+"_"+getPID()+".log");
Cache cache = new CacheFactory(props).create();
Region myRegion = cache.getRegion("root/myRegion");
ReconnectDUnitTest.savedSystem = cache.getDistributedSystem();
myRegion.put("MyKey1", "MyValue1");
// myRegion.put("Mykey2", "MyValue2");
return savedSystem.getDistributedMember();
}
};
SerializableCallable create2 = new SerializableCallable(
"Create Cache and Regions from cache.xml") {
public Object call() throws CacheException
{
// DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
locatorPort = locPort;
final Properties props = getDistributedSystemProperties();
props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml");
props.put("max-wait-time-reconnect", "5000");
props.put("max-num-reconnect-tries", "2");
props.put("start-locator", "localhost["+secondLocPort+"]");
props.put("locators", props.get("locators")+",localhost["+secondLocPort+"]");
props.put("log-file", "autoReconnectVM"+VM.getCurrentVMNum()+"_"+getPID()+".log");
getSystem(props);
// MembershipManagerHelper.getMembershipManager(system).setDebugJGroups(true);
final Cache cache = getCache();
ReconnectDUnitTest.savedSystem = cache.getDistributedSystem();
Region myRegion = cache.getRegion("root/myRegion");
//myRegion.put("MyKey1", "MyValue1");
myRegion.put("Mykey2", "MyValue2");
assertNotNull(myRegion.get("MyKey1"));
//getLogWriter().fine("MyKey1 value is : "+myRegion.get("MyKey1"));
if (createInAppToo) {
Thread recreateCacheThread = new Thread("ReconnectDUnitTest.createInAppThread") {
public void run() {
while (!cache.isClosed()) {
pause(100);
}
try {
new CacheFactory(props).create();
getLogWriter().error("testReconnectCollidesWithApplication failed - application thread was able to create a cache");
} catch (IllegalStateException cacheExists) {
// expected
}
}
};
recreateCacheThread.setDaemon(true);
recreateCacheThread.start();
}
return cache.getDistributedSystem().getDistributedMember();
}
};
vm0.invoke(create1);
DistributedMember dm = (DistributedMember)vm1.invoke(create2);
forceDisconnect(vm1);
DistributedMember newdm = (DistributedMember)vm1.invoke(new SerializableCallable("wait for reconnect(1)") {
public Object call() {
final DistributedSystem ds = ReconnectDUnitTest.savedSystem;
ReconnectDUnitTest.savedSystem = null;
waitForCriterion(new WaitCriterion() {
public boolean done() {
return ds.isReconnecting();
}
public String description() {
return "waiting for ds to begin reconnecting";
}
}, 30000, 1000, true);
getLogWriter().info("entering reconnect wait for " + ds);
getLogWriter().info("ds.isReconnecting() = " + ds.isReconnecting());
boolean failure = true;
try {
ds.waitUntilReconnected(60, TimeUnit.SECONDS);
ReconnectDUnitTest.savedSystem = ds.getReconnectedSystem();
InternalLocator locator = (InternalLocator)Locator.getLocator();
assertTrue("Expected system to be restarted", ds.getReconnectedSystem() != null);
assertTrue("Expected system to be running", ds.getReconnectedSystem().isConnected());
assertTrue("Expected there to be a locator", locator != null);
assertTrue("Expected locator to be restarted", !locator.isStopped());
failure = false;
return ds.getReconnectedSystem().getDistributedMember();
} catch (InterruptedException e) {
getLogWriter().warning("interrupted while waiting for reconnect");
return null;
} finally {
if (failure) {
ds.disconnect();
}
}
}
});
assertNotSame(dm, newdm);
// force another reconnect and show that stopReconnecting works
forceDisconnect(vm1);
boolean stopped = (Boolean)vm1.invoke(new SerializableCallable("wait for reconnect and stop") {
public Object call() {
final DistributedSystem ds = ReconnectDUnitTest.savedSystem;
ReconnectDUnitTest.savedSystem = null;
waitForCriterion(new WaitCriterion() {
public boolean done() {
return ds.isReconnecting() || ds.getReconnectedSystem() != null;
}
public String description() {
return "waiting for reconnect to commence in " + ds;
}
}, 10000, 1000, true);
ds.stopReconnecting();
assertFalse(ds.isReconnecting());
DistributedSystem newDs = InternalDistributedSystem.getAnyInstance();
if (newDs != null) {
getLogWriter().warning("expected distributed system to be disconnected: " + newDs);
return false;
}
return true;
}
});
assertTrue("expected DistributedSystem to disconnect", stopped);
// recreate the system in vm1 without a locator and crash it
dm = (DistributedMember)vm1.invoke(create1);
forceDisconnect(vm1);
newdm = waitForReconnect(vm1);
assertNotSame("expected a reconnect to occur in member", dm, newdm);
deleteLocatorStateFile(locPort);
deleteLocatorStateFile(secondLocPort);
}
private DistributedMember getDMID(VM vm) {
return (DistributedMember)vm.invoke(new SerializableCallable("get ID") {
public Object call() {
ReconnectDUnitTest.savedSystem = InternalDistributedSystem.getAnyInstance();
return ReconnectDUnitTest.savedSystem.getDistributedMember();
}
});
}
private DistributedMember waitForReconnect(VM vm) {
return (DistributedMember)vm.invoke(new SerializableCallable("wait for Reconnect and return ID") {
public Object call() {
System.out.println("waitForReconnect invoked");
final DistributedSystem ds = ReconnectDUnitTest.savedSystem;
ReconnectDUnitTest.savedSystem = null;
waitForCriterion(new WaitCriterion() {
public boolean done() {
return ds.isReconnecting();
}
public String description() {
return "waiting for ds to begin reconnecting";
}
}, 30000, 1000, true);
long waitTime = 120;
getLogWriter().info("VM"+VM.getCurrentVMNum() + " waiting up to "+waitTime+" seconds for reconnect to complete");
try {
ds.waitUntilReconnected(waitTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
fail("interrupted while waiting for reconnect");
}
assertTrue("expected system to be reconnected", ds.getReconnectedSystem() != null);
return ds.getReconnectedSystem().getDistributedMember();
}
});
}
public void testReconnectALocator() throws Exception {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm3 = host.getVM(3);
DistributedMember dm, newdm;
final int locPort = locatorPort;
final int secondLocPort = AvailablePortHelper.getRandomAvailableTCPPort();
deleteLocatorStateFile(locPort, secondLocPort);
final String xmlFileLoc = (new File(".")).getAbsolutePath();
//This locator was started in setUp.
File locatorViewLog = new File(vm3.getWorkingDirectory(), "locator"+locatorPort+"views.log");
assertTrue("Expected to find " + locatorViewLog.getPath() + " file", locatorViewLog.exists());
long logSize = locatorViewLog.length();
vm0.invoke(new SerializableRunnable("Create a second locator") {
public void run() throws CacheException
{
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put("max-wait-time-reconnect", "1000");
props.put("max-num-reconnect-tries", "2");
props.put("locators", props.get("locators")+",localhost["+locPort+"]");
props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
try {
Locator.startLocatorAndDS(secondLocPort, null, props);
} catch (IOException e) {
fail("exception starting locator", e);
}
}
});
File locator2ViewLog = new File(vm0.getWorkingDirectory(), "locator"+secondLocPort+"views.log");
assertTrue("Expected to find " + locator2ViewLog.getPath() + " file", locator2ViewLog.exists());
long log2Size = locator2ViewLog.length();
// create a cache in vm1 so there is more weight in the system
SerializableCallable create1 = new SerializableCallable(
"Create Cache and Regions from cache.xml") {
public Object call() throws CacheException
{
// DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml");
props.put("max-wait-time-reconnect", "1000");
props.put("max-num-reconnect-tries", "2");
ReconnectDUnitTest.savedSystem = getSystem(props);
Cache cache = getCache();
Region myRegion = cache.getRegion("root/myRegion");
myRegion.put("MyKey1", "MyValue1");
// myRegion.put("Mykey2", "MyValue2");
return savedSystem.getDistributedMember();
}
};
vm1.invoke(create1);
try {
dm = getDMID(vm0);
forceDisconnect(vm0);
newdm = waitForReconnect(vm0);
boolean running = (Boolean)vm0.invoke(new SerializableCallable("check for running locator") {
public Object call() {
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return Locator.getLocator() != null;
}
public String description() {
return "waiting for locator to restart";
}
};
waitForCriterion(wc, 30000, 1000, false);
if (Locator.getLocator() == null) {
getLogWriter().error("expected to find a running locator but getLocator() returns null");
return false;
}
if (((InternalLocator)Locator.getLocator()).isStopped()) {
getLogWriter().error("found a stopped locator");
return false;
}
return true;
}
});
if (!running) {
fail("expected the restarted member to be hosting a running locator");
}
assertNotSame("expected a reconnect to occur in the locator", dm, newdm);
// the log should have been opened and appended with a new view
assertTrue("expected " + locator2ViewLog.getPath() + " to grow in size",
locator2ViewLog.length() > log2Size);
// the other locator should have logged a new view
assertTrue("expected " + locatorViewLog.getPath() + " to grow in size",
locatorViewLog.length() > logSize);
} finally {
vm0.invoke(new SerializableRunnable("stop locator") {
public void run() {
Locator loc = Locator.getLocator();
if (loc != null) {
loc.stop();
}
}
});
deleteLocatorStateFile(locPort);
deleteLocatorStateFile(secondLocPort);
}
}
/**
* Test the reconnect behavior when the required roles are missing.
* Reconnect is triggered as a Reliability policy. The test is to
* see if the reconnect is triggered for the configured number of times
*/
public void testReconnectWithRoleLoss() throws TimeoutException,
RegionExistsException {
final String rr1 = "RoleA";
final String rr2 = "RoleB";
final String[] requiredRoles = { rr1, rr2 };
final int locPort = locatorPort;
final String xmlFileLoc = (new File(".")).getAbsolutePath();
beginCacheXml();
locatorPort = locPort;
Properties config = getDistributedSystemProperties();
config.put(DistributionConfig.ROLES_NAME, "");
config.put(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
config.put("log-file", "roleLossController.log");
//creating the DS
getSystem(config);
MembershipAttributes ra = new MembershipAttributes(requiredRoles,
LossAction.RECONNECT, ResumptionAction.NONE);
AttributesFactory fac = new AttributesFactory();
fac.setMembershipAttributes(ra);
fac.setScope(Scope.DISTRIBUTED_ACK);
RegionAttributes attr = fac.create();
createRootRegion("MyRegion", attr);
//writing the cachexml file.
File file = new File("RoleReconnect-cache.xml");
try {
PrintWriter pw = new PrintWriter(new FileWriter(file), true);
CacheXmlGenerator.generate(getCache(), pw);
pw.close();
}
catch (IOException ex) {
fail("IOException during cache.xml generation to " + file, ex);
}
closeCache();
getSystem().disconnect();
getLogWriter().info("disconnected from the system...");
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
// Recreating from the cachexml.
SerializableRunnable roleLoss = new CacheSerializableRunnable(
"ROLERECONNECTTESTS") {
public void run2() throws CacheException, RuntimeException
{
getLogWriter().info("####### STARTING THE REAL TEST ##########");
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put("cache-xml-file", xmlFileLoc+File.separator+"RoleReconnect-cache.xml");
props.put("max-wait-time-reconnect", "200");
final int timeReconnect = 3;
props.put("max-num-reconnect-tries", "3");
props.put(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
props.put("log-file", "roleLossVM0.log");
getSystem(props);
addReconnectListener();
system.getLogWriter().info("<ExpectedException action=add>"
+ "CacheClosedException" + "</ExpectedException");
try{
getCache();
throw new RuntimeException("The test should throw a CancelException ");
}
catch (CancelException ignor){ // can be caused by role loss during intialization.
getLogWriter().info("Got Expected CancelException ");
}
finally {
system.getLogWriter().info("<ExpectedException action=remove>"
+ "CacheClosedException" + "</ExpectedException");
}
getLogWriter().fine("roleLoss Sleeping SO call dumprun.sh");
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return reconnectTries >= timeReconnect;
}
public String description() {
return "Waiting for reconnect count " + timeReconnect + " currently " + reconnectTries;
}
};
DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
getLogWriter().fine("roleLoss done Sleeping");
assertEquals(timeReconnect,
reconnectTries);
}
};
vm0.invoke(roleLoss);
}
public static volatile int reconnectTries;
public static volatile boolean initialized = false;
public static volatile boolean initialRolePlayerStarted = false;
//public static boolean rPut;
public static Integer reconnectTries(){
return new Integer(reconnectTries);
}
public static Boolean isInitialized(){
return new Boolean(initialized);
}
public static Boolean isInitialRolePlayerStarted(){
return new Boolean (initialRolePlayerStarted);
}
// See #50944 before enabling the test. This ticket has been closed with wontFix
// for the 2014 8.0 release.
public void DISABLED_testReconnectWithRequiredRoleRegained()throws Throwable {
final String rr1 = "RoleA";
//final String rr2 = "RoleB";
final String[] requiredRoles = { rr1 };
//final boolean receivedPut[] = new boolean[1];
final Integer[] numReconnect = new Integer[1];
numReconnect[0] = new Integer(-1);
final String myKey = "MyKey";
final String myValue = "MyValue";
final String regionName = "MyRegion";
final int locPort = locatorPort;
// CREATE XML FOR MEMBER THAT WILL SEE ROLE LOSS (in this VM)
beginCacheXml();
locatorPort = locPort;
Properties config = getDistributedSystemProperties();
config.put(DistributionConfig.ROLES_NAME, "");
config.put(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
//creating the DS
getSystem(config);
MembershipAttributes ra = new MembershipAttributes(requiredRoles,
LossAction.RECONNECT, ResumptionAction.NONE);
AttributesFactory fac = new AttributesFactory();
fac.setMembershipAttributes(ra);
fac.setScope(Scope.DISTRIBUTED_ACK);
fac.setDataPolicy(DataPolicy.REPLICATE);
RegionAttributes attr = fac.create();
createRootRegion(regionName, attr);
//writing the cachexml file.
File file = new File("RoleRegained.xml");
try {
PrintWriter pw = new PrintWriter(new FileWriter(file), true);
CacheXmlGenerator.generate(getCache(), pw);
pw.close();
}
catch (IOException ex) {
fail("IOException during cache.xml generation to " + file, ex);
}
closeCache();
//disconnectFromDS();
getSystem().disconnect(); //added
// ################################################################### //
//
Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
vm0.invoke(new CacheSerializableRunnable("reset reconnect count") {
@Override
public void run2() throws CacheException {
reconnectTries = 0;
}
});
SerializableRunnable roleAPlayerForCacheInitialization =
getRoleAPlayerForCacheInitializationRunnable(vm0, locPort, regionName,
"starting roleAplayer, which will initialize, wait for "
+ "vm0 to initialize, and then close its cache to cause role loss");
AsyncInvocation avkVm1 = vm1.invokeAsync(roleAPlayerForCacheInitialization);
CacheSerializableRunnable roleLoss = getRoleLossRunnable(vm1, locPort, regionName, myKey, myValue,
"starting role loss vm. When the role is lost it will start"
+ " trying to reconnect");
final AsyncInvocation roleLossAsync = vm0.invokeAsync(roleLoss);
getLogWriter().info("waiting for role loss vm to start reconnect attempts");
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
if (!roleLossAsync.isAlive()) {
return true;
}
Object res = vm0.invoke(ReconnectDUnitTest.class, "reconnectTries");
if (((Integer)res).intValue() != 0) {
return true;
}
return false;
}
public String description() {
return "waiting for event";
}
};
DistributedTestCase.waitForCriterion(ev, 120 * 1000, 200, true);
VM vm2 = host.getVM(2);
if (roleLossAsync.isAlive()) {
SerializableRunnable roleAPlayer = getRoleAPlayerRunnable(locPort, regionName, myKey, myValue,
"starting roleAPlayer in a different vm."
+ " After this reconnect should succeed in vm0");
vm2.invoke(roleAPlayer);
// long startTime = System.currentTimeMillis();
/*
while (numReconnect[0].intValue() > 0){
if((System.currentTimeMillis()-startTime )> 120000)
fail("The test failed because the required role not satisfied" +
"and the number of reconnected tried is not set to zero for " +
"more than 2 mins");
try{
Thread.sleep(15);
}catch(Exception ee){
getLogWriter().severe("Exception : "+ee);
}
}*/
getLogWriter().info("waiting for vm0 to finish reconnecting");
DistributedTestCase.join(roleLossAsync, 120 * 1000, getLogWriter());
}
if (roleLossAsync.getException() != null){
fail("Exception in Vm0", roleLossAsync.getException());
}
DistributedTestCase.join(avkVm1, 30 * 1000, getLogWriter());
if (avkVm1.getException() != null){
fail("Exception in Vm1", avkVm1.getException());
}
}
private CacheSerializableRunnable getRoleLossRunnable(final VM otherVM, final int locPort,
final String regionName, final String myKey, final Object myValue,
final String startupMessage) {
return new CacheSerializableRunnable("roleloss runnable") {
public void run2()
{
Thread t = null;
try {
// closeCache();
// getSystem().disconnect();
getLogWriter().info(startupMessage);
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return ((Boolean)otherVM.invoke(ReconnectDUnitTest.class, "isInitialRolePlayerStarted")).booleanValue();
}
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
getLogWriter().info("Starting the test and creating the cache and regions etc ...");
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put("cache-xml-file", "RoleRegained.xml");
props.put("max-wait-time-reconnect", "3000");
props.put("max-num-reconnect-tries", "8");
props.put(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
getSystem(props);
system.getLogWriter().info("<ExpectedException action=add>"
+ "CacheClosedException" + "</ExpectedException");
try {
getCache();
} catch (CancelException e) {
// can happen if RoleA goes away during initialization
getLogWriter().info("cache threw CancelException while creating the cache");
}
initialized = true;
addReconnectListener();
ev = new WaitCriterion() {
public boolean done() {
getLogWriter().info("ReconnectTries=" + reconnectTries);
return reconnectTries != 0;
}
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
// long startTime = System.currentTimeMillis();
ev = new WaitCriterion() {
String excuse;
public boolean done() {
if (InternalDistributedSystem.getReconnectCount() != 0) {
excuse = "reconnectCount is " + reconnectTries
+ " waiting for it to be zero";
return false;
}
Object key = null;
Object value= null;
Region.Entry keyValue = null;
try {
Cache cache = CacheFactory.getAnyInstance();
if (cache == null) {
excuse = "no cache";
return false;
}
Region myRegion = cache.getRegion(regionName);
if (myRegion == null) {
excuse = "no region";
return false;
}
Set keyValuePair = myRegion.entrySet();
Iterator it = keyValuePair.iterator();
while (it.hasNext()) {
keyValue = (Region.Entry)it.next();
key = keyValue.getKey();
value = keyValue.getValue();
}
if (key == null) {
excuse = "key is null";
return false;
}
if (!myKey.equals(key)) {
excuse = "key is wrong";
return false;
}
if (value == null) {
excuse = "value is null";
return false;
}
if (!myValue.equals(value)) {
excuse = "value is wrong";
return false;
}
getLogWriter().info("All assertions passed");
getLogWriter().info("MyKey : "+key+" and myvalue : "+value);
return true;
}
catch (CancelException ecc){
// ignor the exception because the cache can be closed/null some times
// while in reconnect.
}
catch(RegionDestroyedException rex){
}
finally {
getLogWriter().info("waiting for reconnect. Current status is '"+excuse+"'");
}
return false;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true); // was 5 * 60 * 1000
Cache cache = CacheFactory.getAnyInstance();
if (cache != null) {
cache.getDistributedSystem().disconnect();
}
}
catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
}
catch (Error th) {
getLogWriter().severe("DEBUG", th);
throw th;
} finally {
if (t != null) {
DistributedTestCase.join(t, 2 * 60 * 1000, getLogWriter());
}
// greplogs won't care if you remove an exception that was never added,
// and this ensures that it gets removed.
system.getLogWriter().info("<ExpectedException action=remove>"
+ "CacheClosedException" + "</ExpectedException");
}
}
}; // roleloss runnable
}
private CacheSerializableRunnable getRoleAPlayerRunnable(
final int locPort, final String regionName, final String myKey, final String myValue,
final String startupMessage) {
return new CacheSerializableRunnable(
"second RoleA player") {
public void run2() throws CacheException
{
getLogWriter().info(startupMessage);
//closeCache();
// getSystem().disconnect();
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
props.put(DistributionConfig.ROLES_NAME, "RoleA");
getSystem(props);
getCache();
AttributesFactory fac = new AttributesFactory();
fac.setScope(Scope.DISTRIBUTED_ACK);
fac.setDataPolicy(DataPolicy.REPLICATE);
RegionAttributes attr = fac.create();
Region region = createRootRegion(regionName, attr);
getLogWriter().info("STARTED THE REQUIREDROLES CACHE");
try{
Thread.sleep(120);
}
catch (Exception ee) {
fail("interrupted");
}
region.put(myKey,myValue);
try {
Thread.sleep(5000); // why are we sleeping for 5 seconds here?
// if it is to give time to avkVm0 to notice us we should have
// him signal us that he has seen us and then we can exit.
}
catch(InterruptedException ee){
fail("interrupted");
}
getLogWriter().info("RolePlayer is done...");
}
};
}
private CacheSerializableRunnable getRoleAPlayerForCacheInitializationRunnable(
final VM otherVM, final int locPort, final String regionName,
final String startupMessage) {
return new CacheSerializableRunnable(
"first RoleA player") {
public void run2() throws CacheException
{
// closeCache();
// getSystem().disconnect();
getLogWriter().info(startupMessage);
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
props.put(DistributionConfig.ROLES_NAME, "RoleA");
getSystem(props);
getCache();
AttributesFactory fac = new AttributesFactory();
fac.setScope(Scope.DISTRIBUTED_ACK);
fac.setDataPolicy(DataPolicy.REPLICATE);
RegionAttributes attr = fac.create();
createRootRegion(regionName, attr);
getLogWriter().info("STARTED THE REQUIREDROLES CACHE");
initialRolePlayerStarted = true;
while(!((Boolean)otherVM.invoke(ReconnectDUnitTest.class, "isInitialized")).booleanValue()){
try{
Thread.sleep(15);
}catch(InterruptedException ignor){
fail("interrupted");
}
}
getLogWriter().info("RoleAPlayerInitializer is done...");
closeCache();
}
};
}
void addReconnectListener() {
reconnectTries = 0; // reset the count for this listener
getLogWriter().info("adding reconnect listener");
ReconnectListener reconlis = new ReconnectListener() {
public void reconnecting(InternalDistributedSystem oldSys) {
getLogWriter().info("reconnect listener invoked");
reconnectTries++;
}
public void onReconnect(InternalDistributedSystem system1, InternalDistributedSystem system2) {}
};
InternalDistributedSystem.addReconnectListener(reconlis);
}
private void waitTimeout() throws InterruptedException
{
Thread.sleep(500);
}
public boolean forceDisconnect(VM vm) {
return (Boolean)vm.invoke(new SerializableCallable("crash distributed system") {
public Object call() throws Exception {
// since the system will disconnect and attempt to reconnect
// a new system the old reference to DTC.system can cause
// trouble, so we first null it out.
DistributedTestCase.system = null;
final DistributedSystem msys = InternalDistributedSystem.getAnyInstance();
final Locator oldLocator = Locator.getLocator();
// MembershipManagerHelper.inhibitForcedDisconnectLogging(true);
MembershipManagerHelper.playDead(msys);
JChannel c = MembershipManagerHelper.getJChannel(msys);
Protocol udp = c.getProtocolStack().findProtocol("UDP");
// udp.stop();
udp.passUp(new Event(Event.EXIT, new ForcedDisconnectException("killing member's ds")));
// try {
// MembershipManagerHelper.getJChannel(msys).waitForClose();
// }
// catch (InterruptedException ie) {
// Thread.currentThread().interrupt();
// // attempt rest of work with interrupt bit set
// }
// MembershipManagerHelper.inhibitForcedDisconnectLogging(false);
if (oldLocator != null) {
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return ((InternalLocator)oldLocator).isStopped();
}
public String description() {
return "waiting for locator to stop: " + oldLocator;
}
};
waitForCriterion(wc, 10000, 50, true);
}
return true;
}
});
}
private static int getPID() {
String name = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
int idx = name.indexOf('@');
try {
return Integer.parseInt(name.substring(0,idx));
} catch(NumberFormatException nfe) {
//something changed in the RuntimeMXBean name
}
return 0;
}
}