blob: b93e2348ec19925134e743ae28874ae20b855ae3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache30;
import static java.lang.System.out;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.cache.DataPolicy.REPLICATE;
import static org.apache.geode.cache.LossAction.RECONNECT;
import static org.apache.geode.cache.ResumptionAction.NONE;
import static org.apache.geode.cache.Scope.DISTRIBUTED_ACK;
import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
import static org.apache.geode.distributed.ConfigurationProperties.DISABLE_AUTO_RECONNECT;
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MAX_NUM_RECONNECT_TRIES;
import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
import static org.apache.geode.distributed.ConfigurationProperties.ROLES;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_MANAGER;
import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
import static org.apache.geode.distributed.Locator.getLocator;
import static org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper.getMembershipManager;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.apache.geode.test.dunit.Host.getHost;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.apache.geode.test.dunit.ThreadUtils.join;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.CancelException;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.LossAction;
import org.apache.geode.cache.MembershipAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.ResumptionAction;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalDistributedSystem.ReconnectListener;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.ServerLocator;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
import org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager;
import org.apache.geode.examples.SimpleSecurityManager;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.xmlcache.CacheXmlGenerator;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.MembershipTest;
@SuppressWarnings("serial")
@Category({MembershipTest.class})
public class ReconnectDUnitTest extends JUnit4CacheTestCase {
static int locatorPort;
static Locator locator;
static DistributedSystem savedSystem;
static GemFireCacheImpl savedCache;
static int locatorVMNumber = 3;
static Thread gfshThread;
static Properties dsProperties;
static String fileSeparator = File.separator;
public ReconnectDUnitTest() {
super();
}
@Override
public final void postSetUp() throws Exception {
locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
final int locPort = locatorPort;
Host.getHost(0).getVM(locatorVMNumber).invoke(new SerializableRunnable("start locator") {
@Override
public void run() {
try {
disconnectFromDS();
dsProperties = null;
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
locator = Locator.startLocatorAndDS(locatorPort, new File(""), props);
system = (InternalDistributedSystem) locator.getDistributedSystem();
cache = ((InternalLocator) locator).getCache();
ReconnectDUnitTest.savedSystem = locator.getDistributedSystem();
IgnoredException.addIgnoredException(
"org.apache.geode.ForcedDisconnectException||Possible loss of quorum");
// MembershipManagerHelper.getMembershipManager(InternalDistributedSystem.getConnectedInstance()).setDebugJGroups(true);
} catch (IOException e) {
Assert.fail("unable to start locator", e);
}
}
});
SerializableRunnable setDistributedSystemProperties =
new SerializableRunnable("set distributed system properties") {
@Override
public void run() {
dsProperties = null;
locatorPort = locPort;
getDistributedSystemProperties();
}
};
setDistributedSystemProperties.run();
Invoke.invokeInEveryVM(setDistributedSystemProperties);
beginCacheXml();
createRegion("myRegion", createAtts());
finishCacheXml("MyDisconnect");
// Cache cache = getCache();
closeCache();
basicGetSystem().disconnect();
}
@Override
public Properties getDistributedSystemProperties() {
if (dsProperties == null) {
dsProperties = new Properties();
dsProperties.put(MAX_WAIT_TIME_RECONNECT, "20000");
dsProperties.put(ENABLE_NETWORK_PARTITION_DETECTION, "true");
dsProperties.put(DISABLE_AUTO_RECONNECT, "false");
dsProperties.put(ENABLE_CLUSTER_CONFIGURATION, "false");
dsProperties.put(LOCATORS, "localHost[" + locatorPort + "]");
dsProperties.put(MCAST_PORT, "0");
dsProperties.put(MEMBER_TIMEOUT, "1000");
dsProperties.put(LOG_LEVEL, "info");
dsProperties.put(SECURITY_MANAGER, SimpleSecurityManager.class.getName());
dsProperties.put("security-username", "clusterManage");
dsProperties.put("security-password", "clusterManage");
addDSProps(dsProperties);
}
return dsProperties;
}
@Override
public final void postTearDownCacheTestCase() throws Exception {
System.out.println("entering postTearDownCacheTestCase");
SerializableRunnable disconnect = new SerializableRunnable("disconnect and clean up") {
@Override
public void run() {
if (savedSystem != null && savedSystem.isReconnecting()) {
savedSystem.stopReconnecting();
}
savedSystem = null;
savedCache = null;
dsProperties = null;
locator = null;
locatorPort = 0;
}
};
Invoke.invokeInEveryVM(disconnect);
disconnect.run();
disconnectAllFromDS();
}
/**
* Creates some region attributes for the regions being created.
*/
private RegionAttributes createAtts() {
AttributesFactory factory = new AttributesFactory();
{
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setScope(Scope.DISTRIBUTED_ACK);
}
return factory.create();
}
@Test
public void testReconnectWithQuorum() throws Exception {
// quorum check fails, then succeeds
IgnoredException.addIgnoredException("killing member's ds");
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM locatorVm = host.getVM(locatorVMNumber);
final int locPort = locatorPort;
final String xmlFileLoc = (new File(".")).getAbsolutePath();
// disable disconnects in the locator so we have some stability
locatorVm.invoke(new SerializableRunnable("disable force-disconnect") {
@Override
public void run() {
GMSMembershipManager mgr = (GMSMembershipManager) MembershipManagerHelper
.getMembershipManager(Locator.getLocator().getDistributedSystem());
mgr.disableDisconnectOnQuorumLossForTesting();
}
});
SerializableCallable create =
new SerializableCallable("Create Cache and Regions from cache.xml") {
@Override
public Object call() throws CacheException {
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put(CACHE_XML_FILE, xmlFileLoc + fileSeparator + "MyDisconnect-cache.xml");
props.put(MAX_NUM_RECONNECT_TRIES, "2");
// props.put("log-file", "autoReconnectVM"+VM.getCurrentVMNum()+"_"+getPID()+".log");
cache = (InternalCache) new CacheFactory(props).create();
IgnoredException.addIgnoredException(
"org.apache.geode.ForcedDisconnectException||Possible loss of quorum");
Region myRegion = cache.getRegion("root/myRegion");
ReconnectDUnitTest.savedSystem = cache.getDistributedSystem();
myRegion.put("MyKey1", "MyValue1");
return savedSystem.getDistributedMember();
}
};
System.out.println("creating caches in vm0 and vm1");
vm0.invoke(create);
vm1.invoke(create);
// view is [locator(3), vm0(15), vm1(10), vm2(10)]
/*
* now we want to kick out the locator and observe that it reconnects using its rebooted
* location service
*/
System.out.println("disconnecting locator");
forceDisconnect(locatorVm);
waitForReconnect(locatorVm);
// if the locator reconnected it did so with its own location
// service since it doesn't know about any other locators
ensureLocationServiceRunning(locatorVm);
}
@Test
public void testReconnectOnForcedDisconnect() throws Exception {
doTestReconnectOnForcedDisconnect(false);
}
/** bug #51335 - customer is also trying to recreate the cache */
// this test is disabled due to a high failure rate during CI test runs.
// see bug #52160
@Test
public void testReconnectCollidesWithApplication() throws Exception {
doTestReconnectOnForcedDisconnect(true);
}
public void doTestReconnectOnForcedDisconnect(final boolean createInAppToo) throws Exception {
IgnoredException.addIgnoredException("killing member's ds");
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final int locPort = locatorPort;
final int secondLocPort = AvailablePortHelper.getRandomAvailableTCPPort();
DistributedTestUtils.deleteLocatorStateFile(locPort, secondLocPort);
final String xmlFileLoc = (new File(".")).getAbsolutePath();
SerializableCallable create1 =
new SerializableCallable("Create Cache and Regions from cache.xml") {
@Override
public Object call() throws CacheException {
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put(CACHE_XML_FILE, xmlFileLoc + fileSeparator + "MyDisconnect-cache.xml");
props.put(MAX_WAIT_TIME_RECONNECT, "1000");
cache = (InternalCache) new CacheFactory(props).create();
Region myRegion = cache.getRegion("root/myRegion");
ReconnectDUnitTest.savedSystem = cache.getDistributedSystem();
myRegion.put("MyKey1", "MyValue1");
return savedSystem.getDistributedMember();
}
};
SerializableCallable create2 =
new SerializableCallable("Create Cache and Regions from cache.xml") {
@Override
public Object call() throws CacheException {
locatorPort = locPort;
final Properties props = getDistributedSystemProperties();
props.put(CACHE_XML_FILE, xmlFileLoc + fileSeparator + "MyDisconnect-cache.xml");
props.put(MAX_WAIT_TIME_RECONNECT, "5000");
props.put(START_LOCATOR, "localhost[" + secondLocPort + "]");
props.put(LOCATORS, props.get(LOCATORS) + ",localhost[" + secondLocPort + "]");
getSystem(props);
cache = getCache();
ReconnectDUnitTest.savedSystem = cache.getDistributedSystem();
Region myRegion = cache.getRegion("root/myRegion");
myRegion.put("Mykey2", "MyValue2");
assertNotNull(myRegion.get("MyKey1"));
if (createInAppToo) {
Thread recreateCacheThread = new Thread("ReconnectDUnitTest.createInAppThread") {
@Override
public void run() {
while (!cache.isClosed()) {
Wait.pause(100);
}
try {
cache = (InternalCache) new CacheFactory(props).create();
System.err.println(
"testReconnectCollidesWithApplication failed - application thread was able to create a cache");
} catch (IllegalStateException cacheExists) {
// expected
}
}
};
recreateCacheThread.setDaemon(true);
recreateCacheThread.start();
}
return cache.getDistributedSystem().getDistributedMember();
}
};
vm0.invoke(create1);
final DistributedMember dm = (DistributedMember) vm1.invoke(create2);
IgnoredException.addIgnoredException("ForcedDisconnectException");
forceDisconnect(vm1);
DistributedMember newdm =
(DistributedMember) vm1.invoke(new SerializableCallable("wait for reconnect(1)") {
@Override
public Object call() {
final DistributedSystem ds = savedSystem;
savedSystem = null;
GeodeAwaitility.await().untilAsserted(new WaitCriterion() {
@Override
public boolean done() {
return ds.isReconnecting();
}
@Override
public String description() {
return "waiting for ds to begin reconnecting";
}
});
System.out.println("entering reconnect wait for " + ds);
System.out.println("ds.isReconnecting() = " + ds.isReconnecting());
boolean failure = true;
try {
ds.waitUntilReconnected(getTimeout().getValueInMS(), MILLISECONDS);
savedSystem = ds.getReconnectedSystem();
locator = (InternalLocator) getLocator();
assertTrue("Expected system to be restarted", ds.getReconnectedSystem() != null);
assertTrue("Expected system to be running", ds.getReconnectedSystem().isConnected());
assertTrue("Expected there to be a locator", locator != null);
assertTrue("Expected locator to be restarted",
!((InternalLocator) locator).isStopped());
failure = false;
cache = ((InternalLocator) locator).getCache();
system = cache.getInternalDistributedSystem();
assertTrue(
((GMSMembershipManager) getMembershipManager(system))
.getServices().getMessenger().isOldMembershipIdentifier(dm));
return ds.getReconnectedSystem().getDistributedMember();
} catch (InterruptedException e) {
System.err.println("interrupted while waiting for reconnect");
return null;
} finally {
if (failure) {
ds.disconnect();
}
}
}
});
assertNotSame(dm, newdm);
// force another reconnect and show that stopReconnecting works
forceDisconnect(vm1);
boolean stopped = (Boolean) vm1.invoke(new SerializableCallable("wait for reconnect and stop") {
@Override
public Object call() {
final DistributedSystem ds = savedSystem;
savedSystem = null;
GeodeAwaitility.await().untilAsserted(new WaitCriterion() {
@Override
public boolean done() {
return ds.isReconnecting() || ds.getReconnectedSystem() != null;
}
@Override
public String description() {
return "waiting for reconnect to commence in " + ds;
}
});
ds.stopReconnecting();
assertFalse(ds.isReconnecting());
DistributedSystem newDs = ds.getReconnectedSystem();
if (newDs != null) {
System.err.println("expected distributed system to be disconnected: " + newDs);
newDs.disconnect();
return false;
}
return true;
}
});
assertTrue("expected DistributedSystem to disconnect", stopped);
// recreate the system in vm1 without a locator and crash it
DistributedMember evenNewerdm = (DistributedMember) vm1.invoke(create1);
forceDisconnect(vm1);
newdm = waitForReconnect(vm1);
assertNotSame("expected a reconnect to occur in member", evenNewerdm, newdm);
DistributedTestUtils.deleteLocatorStateFile(locPort);
DistributedTestUtils.deleteLocatorStateFile(secondLocPort);
}
private DistributedMember getDMID(VM vm) {
return (DistributedMember) vm.invoke(new SerializableCallable("get ID") {
@Override
public Object call() {
ReconnectDUnitTest.savedSystem = cache.getDistributedSystem();
return ReconnectDUnitTest.savedSystem.getDistributedMember();
}
});
}
/** this will throw an exception if location services aren't running */
private void ensureLocationServiceRunning(VM vm) {
vm.invoke("ensureLocationServiceRunning", () -> {
await().untilAsserted(() -> {
InternalLocator intloc = (InternalLocator) locator;
ServerLocator serverLocator = intloc.getServerLocatorAdvisee();
// the initialization flag in the locator's ControllerAdvisor will
// be set if a handshake has been performed
assertTrue(serverLocator.getDistributionAdvisor().isInitialized());
});
});
}
private DistributedMember waitForReconnect(VM vm) {
return (DistributedMember) vm
.invoke(new SerializableCallable("wait for Reconnect and return ID") {
@Override
public Object call() {
out.println("waitForReconnect invoked");
final DistributedSystem ds = savedSystem;
savedSystem = null;
GeodeAwaitility.await().untilAsserted(new WaitCriterion() {
@Override
public boolean done() {
return ds.isReconnecting();
}
@Override
public String description() {
return "waiting for ds to begin reconnecting";
}
});
long waitTime = 600;
System.out.println("VM" + VM.getCurrentVMNum() + " waiting up to "
+ waitTime + " seconds for reconnect to complete");
try {
ds.waitUntilReconnected(waitTime, SECONDS);
} catch (InterruptedException e) {
fail("interrupted while waiting for reconnect");
}
assertTrue("expected system to be reconnected", ds.getReconnectedSystem() != null);
int oldViewId =
getMembershipManager(ds).getLocalMember().getVmViewId();
int newViewId =
((InternalDistributedMember) ds.getReconnectedSystem().getDistributedMember())
.getVmViewId();
if (!(newViewId > oldViewId)) {
fail("expected a new ID to be assigned. oldViewId=" + oldViewId + "; newViewId="
+ newViewId);
}
return ds.getReconnectedSystem().getDistributedMember();
}
});
}
@Test
public void testReconnectALocator() throws Exception {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM locatorVm = host.getVM(3);
DistributedMember dm, newdm;
final int locPort = locatorPort;
final int secondLocPort = AvailablePortHelper.getRandomAvailableTCPPort();
DistributedTestUtils.deleteLocatorStateFile(locPort, secondLocPort);
final String xmlFileLoc = (new File(".")).getAbsolutePath();
// This locator was started in setUp.
File locatorViewLog =
new File(locatorVm.getWorkingDirectory(), "locator" + locatorPort + "views.log");
assertTrue("Expected to find " + locatorViewLog.getPath() + " file", locatorViewLog.exists());
long logSize = locatorViewLog.length();
vm0.invoke("Create a second locator", () -> {
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put(MAX_WAIT_TIME_RECONNECT, "1000");
props.put(LOCATORS, props.get(LOCATORS) + ",localhost[" + locPort + "]");
props.put(ENABLE_CLUSTER_CONFIGURATION, "false");
try {
InternalLocator locator =
(InternalLocator) Locator.startLocatorAndDS(secondLocPort, null, props);
system = (InternalDistributedSystem) locator.getDistributedSystem();
cache = locator.getCache();
} catch (IOException e) {
Assert.fail("exception starting locator", e);
}
});
File locator2ViewLog =
new File(vm0.getWorkingDirectory(), "locator" + secondLocPort + "views.log");
assertTrue("Expected to find " + locator2ViewLog.getPath() + " file", locator2ViewLog.exists());
long log2Size = locator2ViewLog.length();
// create a cache in vm1 so there is more weight in the system
vm1.invoke("Create Cache and Regions from cache.xml", () -> {
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put(CACHE_XML_FILE, xmlFileLoc + fileSeparator + "MyDisconnect-cache.xml");
props.put(MAX_WAIT_TIME_RECONNECT, "1000");
ReconnectDUnitTest.savedSystem = getSystem(props);
cache = getCache();
Region myRegion = cache.getRegion("root/myRegion");
myRegion.put("MyKey1", "MyValue1");
return savedSystem.getDistributedMember();
});
try {
dm = getDMID(vm0);
createGfshWaitingThread(vm0);
forceDisconnect(vm0);
newdm = waitForReconnect(vm0);
assertGfshWaitingThreadAlive(vm0);
assertTrue("Expected the restarted member to be hosting a running locator",
vm0.invoke("check for running locator", () -> {
await("waiting for locator to restart")
.until(Locator::getLocator, notNullValue());
if (((InternalLocator) Locator.getLocator()).isStopped()) {
System.err.println("found a stopped locator");
return false;
}
return true;
}));
assertNotSame("expected a reconnect to occur in the locator", dm, newdm);
// the log should have been opened and appended with a new view
assertTrue("expected " + locator2ViewLog.getPath() + " to grow in size",
locator2ViewLog.length() > log2Size);
// the other locator should have logged a new view
assertTrue("expected " + locatorViewLog.getPath() + " to grow in size",
locatorViewLog.length() > logSize);
} finally {
vm0.invoke(new SerializableRunnable("stop locator") {
@Override
public void run() {
Locator loc = Locator.getLocator();
if (loc != null) {
loc.stop();
}
if (gfshThread != null && gfshThread.isAlive()) {
gfshThread.interrupt();
}
gfshThread = null;
}
});
DistributedTestUtils.deleteLocatorStateFile(locPort);
DistributedTestUtils.deleteLocatorStateFile(secondLocPort);
}
}
@SuppressWarnings("serial")
private void createGfshWaitingThread(VM vm) {
vm.invoke(new SerializableRunnable("create Gfsh-like waiting thread") {
@Override
public void run() {
final Locator loc = Locator.getLocator();
assertNotNull(loc);
gfshThread = new Thread("ReconnectDUnitTest_Gfsh_thread") {
@Override
public void run() {
try {
((InternalLocator) loc).waitToStop();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " interrupted - exiting");
}
}
};
gfshThread.setDaemon(true);
gfshThread.start();
System.out.println("created gfsh thread: " + gfshThread);
}
});
}
@SuppressWarnings("serial")
private void assertGfshWaitingThreadAlive(VM vm) {
vm.invoke(new SerializableRunnable("assert gfshThread is still waiting") {
@Override
public void run() {
assertTrue(gfshThread.isAlive());
}
});
}
/**
* Test the reconnect behavior when the required roles are missing. Reconnect is triggered as a
* Reliability policy. The test is to see if the reconnect is triggered for the configured number
* of times
*/
@Test
public void testReconnectWithRoleLoss() throws TimeoutException, RegionExistsException {
final String rr1 = "RoleA";
final String rr2 = "RoleB";
final String[] requiredRoles = {rr1, rr2};
final int locPort = locatorPort;
final String xmlFileLoc = (new File(".")).getAbsolutePath();
beginCacheXml();
locatorPort = locPort;
Properties config = getDistributedSystemProperties();
config.put(ROLES, "");
config.put(LOG_LEVEL, "info");
// config.put("log-file", "roleLossController.log");
// creating the DS
getSystem(config);
MembershipAttributes ra =
new MembershipAttributes(requiredRoles, LossAction.RECONNECT, ResumptionAction.NONE);
AttributesFactory fac = new AttributesFactory();
fac.setMembershipAttributes(ra);
fac.setScope(Scope.DISTRIBUTED_ACK);
RegionAttributes attr = fac.create();
createRootRegion("MyRegion", attr);
// writing the cachexml file.
File file = new File("RoleReconnect-cache.xml");
try {
PrintWriter pw = new PrintWriter(new FileWriter(file), true);
CacheXmlGenerator.generate(getCache(), pw);
pw.close();
} catch (IOException ex) {
Assert.fail("IOException during cache.xml generation to " + file, ex);
}
closeCache();
basicGetSystem().disconnect();
System.out.println("disconnected from the system...");
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
// Recreating from the cachexml.
SerializableRunnable roleLoss = new CacheSerializableRunnable("ROLERECONNECTTESTS") {
@Override
public void run2() throws RuntimeException {
System.out.println("####### STARTING THE REAL TEST ##########");
locatorPort = locPort;
dsProperties = null;
Properties props = getDistributedSystemProperties();
props.put(CACHE_XML_FILE, xmlFileLoc + fileSeparator + "RoleReconnect-cache.xml");
props.put(MAX_WAIT_TIME_RECONNECT, "200");
final int timeReconnect = 3;
props.put(MAX_NUM_RECONNECT_TRIES, "3");
props.put(LOG_LEVEL, "info");
// props.put("log-file", "roleLossVM0.log");
getSystem(props);
addReconnectListener();
IgnoredException.addIgnoredException("CacheClosedException");
try {
getCache(props);
throw new RuntimeException("The test should throw a CancelException ");
} catch (CancelException ignor) { // can be caused by role loss during intialization.
System.out.println("Got Expected CancelException ");
}
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return reconnectTries >= timeReconnect;
}
@Override
public String description() {
return "Waiting for reconnect count " + timeReconnect + " currently " + reconnectTries;
}
};
Wait.waitForCriterion(ev, 60 * 1000, 200, true);
assertEquals(timeReconnect, reconnectTries);
}
};
vm0.invoke(roleLoss);
}
public static volatile int reconnectTries;
public static volatile boolean initialized = false;
public static volatile boolean initialRolePlayerStarted = false;
// public static boolean rPut;
public static int reconnectTries() {
return reconnectTries;
}
public static boolean isInitialized() {
return initialized;
}
public static boolean isInitialRolePlayerStarted() {
return initialRolePlayerStarted;
}
@Before
public void initStatics() {
Invoke.invokeInEveryVM(() -> {
reconnectTries = 0;
initialized = false;
initialRolePlayerStarted = false;
});
}
// See #50944 before enabling the test. This ticket has been closed with wontFix
// for the 2014 8.0 release.
@Test
public void testReconnectWithRequiredRoleRegained() throws Throwable {
final String rr1 = "RoleA";
// final String rr2 = "RoleB";
final String[] requiredRoles = {rr1};
// final boolean receivedPut[] = new boolean[1];
final Integer[] numReconnect = new Integer[1];
numReconnect[0] = new Integer(-1);
final String myKey = "MyKey";
final String myValue = "MyValue";
final String regionName = "MyRegion";
final int locPort = locatorPort;
// CREATE XML FOR MEMBER THAT WILL SEE ROLE LOSS (in this VM)
beginCacheXml();
locatorPort = locPort;
Properties config = getDistributedSystemProperties();
config.put(ROLES, "");
config.put(LOG_LEVEL, "info");
// creating the DS
getSystem(config);
MembershipAttributes ra =
new MembershipAttributes(requiredRoles, RECONNECT, NONE);
AttributesFactory fac = new AttributesFactory();
fac.setMembershipAttributes(ra);
fac.setScope(DISTRIBUTED_ACK);
fac.setDataPolicy(REPLICATE);
RegionAttributes attr = fac.create();
createRootRegion(regionName, attr);
// writing the cachexml file.
File file = new File("RoleRegained.xml");
try {
PrintWriter pw = new PrintWriter(new FileWriter(file), true);
CacheXmlGenerator.generate(getCache(), pw);
pw.close();
} catch (IOException ex) {
Assert.fail("IOException during cache.xml generation to " + file, ex);
}
closeCache();
// disconnectFromDS();
getSystem().disconnect();
// ################################################################### //
//
Host host = getHost(0);
final VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
vm0.invoke(new CacheSerializableRunnable("reset reconnect count") {
@Override
public void run2() throws CacheException {
reconnectTries = 0;
}
});
SerializableRunnable roleAPlayerForCacheInitialization =
getRoleAPlayerForCacheInitializationRunnable(vm0, locPort, regionName,
"starting roleAplayer, which will initialize, wait for "
+ "vm0 to initialize, and then close its cache to cause role loss");
AsyncInvocation avkVm1 = vm1.invokeAsync(roleAPlayerForCacheInitialization);
CacheSerializableRunnable roleLoss =
getRoleLossRunnable(vm1, locPort, regionName, myKey, myValue,
"starting role loss vm. When the role is lost it will start" + " trying to reconnect",
file.getAbsolutePath());
final AsyncInvocation roleLossAsync = vm0.invokeAsync(roleLoss);
System.out.println("waiting for role loss vm to start reconnect attempts");
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
if (!roleLossAsync.isAlive()) {
return true;
}
int tries = vm0.invoke(() -> ReconnectDUnitTest.reconnectTries());
return tries != 0;
}
@Override
public String description() {
return "waiting for event";
}
};
GeodeAwaitility.await().untilAsserted(ev);
VM vm2 = host.getVM(2);
if (roleLossAsync.isAlive()) {
SerializableRunnable roleAPlayer = getRoleAPlayerRunnable(locPort, regionName, myKey, myValue,
"starting roleAPlayer in a different vm."
+ " After this reconnect should succeed in vm0");
vm2.invoke(roleAPlayer);
// long startTime = System.currentTimeMillis();
/*
* while (numReconnect[0].intValue() > 0){ if((System.currentTimeMillis()-startTime )> 120000)
* fail("The test failed because the required role not satisfied" +
* "and the number of reconnected tried is not set to zero for " + "more than 2 mins"); try{
* Thread.sleep(15); }catch(Exception ee){ getLogWriter().severe("Exception : "+ee); } }
*/
System.out.println("waiting for vm0 to finish reconnecting");
ThreadUtils.join(roleLossAsync, 120 * 1000);
}
if (roleLossAsync.getException() != null) {
throw roleLossAsync.getException();
}
join(avkVm1, 30 * 1000);
if (avkVm1.getException() != null) {
Assert.fail("Exception in Vm1", avkVm1.getException());
}
}
private CacheSerializableRunnable getRoleLossRunnable(final VM otherVM, final int locPort,
final String regionName, final String myKey, final Object myValue,
final String startupMessage, final String xmlFilePath) {
return new CacheSerializableRunnable("roleloss runnable") {
@Override
public void run2() {
System.out.println(startupMessage);
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return otherVM.invoke(() -> ReconnectDUnitTest.isInitialRolePlayerStarted())
.booleanValue();
}
@Override
public String description() {
return null;
}
};
await().untilAsserted(ev);
System.out.println(
"Starting the test and creating the cache and regions etc ..." + System.getenv("PWD"));
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put(CACHE_XML_FILE, xmlFilePath);
props.put(MAX_WAIT_TIME_RECONNECT, "3000");
props.put(MAX_NUM_RECONNECT_TRIES, "8");
props.put(LOG_LEVEL, "info");
getSystem(props);
IgnoredException.addIgnoredException("CacheClosedException");
try {
getCache();
} catch (CancelException e) {
// can happen if RoleA goes away during initialization
System.out.println("cache threw CancelException while creating the cache");
}
initialized = true;
addReconnectListener();
await().until(() -> reconnectTries != 0);
await().until(() -> {
String excuse = "none";
if (InternalDistributedSystem.getReconnectAttemptCounter() != 0) {
System.out.println("reconnectAttemptCounter is "
+ InternalDistributedSystem.getReconnectAttemptCounter()
+ " waiting for it to be zero");
return false;
}
Object key = null;
Object value = null;
Region.Entry keyValue = null;
try {
if (cache == null) {
excuse = "no cache";
return false;
}
Region myRegion = cache.getRegion(regionName);
if (myRegion == null) {
excuse = "no region";
return false;
}
Set keyValuePair = myRegion.entrySet();
Iterator it = keyValuePair.iterator();
while (it.hasNext()) {
keyValue = (Region.Entry) it.next();
key = keyValue.getKey();
value = keyValue.getValue();
}
if (key == null) {
excuse = "key is null";
return false;
}
if (!myKey.equals(key)) {
excuse = "key is wrong";
return false;
}
if (value == null) {
excuse = "value is null";
return false;
}
if (!myValue.equals(value)) {
excuse = "value is wrong";
return false;
}
System.out.println("All assertions passed");
System.out.println("MyKey : " + key + " and myvalue : " + value);
return true;
} catch (CancelException ecc) {
// ignor the exception because the cache can be closed/null some times
// while in reconnect.
} catch (RegionDestroyedException rex) {
} finally {
System.out.println("waiting for reconnect. Current status is '" + excuse + "'");
}
return false;
});
if (cache != null) {
cache.getDistributedSystem().disconnect();
}
}
}; // roleloss runnable
}
/**
* auto-reconnect was found to stop attempting to reconnect and rebuild the cache if another
* forced-disconnect was triggered after reconnect but before cache creation was completed. This
* test uses a region listener to crash the reconnecting distributed system during cache creation
* and asserts that it then reconnects and rebuilds the cache.
*/
@Test
public void testReconnectFailsInCacheCreation() throws Exception {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final int locPort = locatorPort;
SerializableRunnable createCache = new SerializableRunnable("Create Cache and Regions") {
@Override
public void run() {
locatorPort = locPort;
final Properties props = getDistributedSystemProperties();
props.put(MAX_WAIT_TIME_RECONNECT, "1000");
dsProperties = props;
ReconnectDUnitTest.savedSystem = getSystem(props);
ReconnectDUnitTest.savedCache = (GemFireCacheImpl) getCache();
Region myRegion = createRegion("myRegion", createAtts());
myRegion.put("MyKey", "MyValue");
myRegion.getAttributesMutator()
.addCacheListener(new CacheListenerTriggeringForcedDisconnect());
}
};
vm0.invoke(createCache); // vm0 keeps the locator from losing quorum when vm1 crashes
vm1.invoke(createCache);
IgnoredException.addIgnoredException(
"DistributedSystemDisconnectedException|ForcedDisconnectException", vm1);
forceDisconnect(vm1);
vm1.invoke(new SerializableRunnable("wait for reconnect") {
@Override
public void run() {
final GemFireCacheImpl cache = savedCache;
GeodeAwaitility.await().untilAsserted(new WaitCriterion() {
@Override
public boolean done() {
return cache.isReconnecting();
}
@Override
public String description() {
return "waiting for cache to begin reconnecting";
}
});
out.println("entering reconnect wait for " + cache);
try {
cache.waitUntilReconnected(5, MINUTES);
} catch (InterruptedException e) {
fail("interrupted");
}
assertNotNull(cache.getReconnectedCache());
}
});
}
/**
* GEODE-2155 Auto-reconnect fails with NPE due to a cache listener Declarable.init method
* throwing an exception.
*/
@Test
public void testReconnectFailsDueToBadCacheXML() throws Exception {
Host host = getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final int locPort = locatorPort;
SerializableRunnable createCache = new SerializableRunnable("Create Cache and Regions") {
@Override
public void run() {
locatorPort = locPort;
final Properties props = getDistributedSystemProperties();
props.put(MAX_WAIT_TIME_RECONNECT, "1000");
dsProperties = props;
savedSystem = getSystem(props);
savedCache = (GemFireCacheImpl) getCache();
Region myRegion = createRegion("myRegion", createAtts());
myRegion.put("MyKey", "MyValue");
myRegion.getAttributesMutator().addCacheListener(new ListenerWhoseInitMethodAlwaysThrows());
}
};
vm0.invoke(createCache); // vm0 keeps the locator from losing quorum when vm1 crashes
createCache.run();
addIgnoredException(
"DistributedSystemDisconnectedException|ForcedDisconnectException", vm1);
forceDisconnect(null);
final GemFireCacheImpl cache = savedCache;
GeodeAwaitility.await().untilAsserted(new WaitCriterion() {
@Override
public boolean done() {
return cache.isReconnecting()
|| cache.getInternalDistributedSystem().isReconnectCancelled();
}
@Override
public String description() {
return "waiting for cache to begin reconnecting";
}
});
assertThatThrownBy(() -> cache.waitUntilReconnected(getTimeout().getValueInMS(), MILLISECONDS))
.isInstanceOf(CacheClosedException.class)
.hasMessageContaining("Cache could not be recreated")
.hasCauseExactlyInstanceOf(DistributedSystemDisconnectedException.class);
assertTrue(cache.getInternalDistributedSystem().isReconnectCancelled());
assertNull(cache.getReconnectedCache());
}
private CacheSerializableRunnable getRoleAPlayerRunnable(final int locPort,
final String regionName, final String myKey, final String myValue,
final String startupMessage) {
return new CacheSerializableRunnable("second RoleA player") {
@Override
public void run2() throws CacheException {
System.out.println(startupMessage);
// closeCache();
// getSystem().disconnect();
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put(LOG_LEVEL, "info");
props.put(ROLES, "RoleA");
getSystem(props);
getCache();
AttributesFactory fac = new AttributesFactory();
fac.setScope(Scope.DISTRIBUTED_ACK);
fac.setDataPolicy(DataPolicy.REPLICATE);
RegionAttributes attr = fac.create();
Region region = createRootRegion(regionName, attr);
System.out.println("STARTED THE REQUIREDROLES CACHE");
try {
Thread.sleep(120);
} catch (Exception ee) {
fail("interrupted");
}
region.put(myKey, myValue);
try {
Thread.sleep(5000); // why are we sleeping for 5 seconds here?
// if it is to give time to avkVm0 to notice us we should have
// avkVm0 signal us that it has seen us and then we can exit.
} catch (InterruptedException ee) {
fail("interrupted");
}
System.out.println("RolePlayer is done...");
}
};
}
private CacheSerializableRunnable getRoleAPlayerForCacheInitializationRunnable(final VM otherVM,
final int locPort, final String regionName, final String startupMessage) {
return new CacheSerializableRunnable("first RoleA player") {
@Override
public void run2() throws CacheException {
// closeCache();
// getSystem().disconnect();
System.out.println(startupMessage);
locatorPort = locPort;
Properties props = getDistributedSystemProperties();
props.put(LOG_LEVEL, "info");
props.put(ROLES, "RoleA");
getSystem(props);
getCache();
AttributesFactory fac = new AttributesFactory();
fac.setScope(Scope.DISTRIBUTED_ACK);
fac.setDataPolicy(DataPolicy.REPLICATE);
RegionAttributes attr = fac.create();
createRootRegion(regionName, attr);
System.out.println("STARTED THE REQUIREDROLES CACHE");
initialRolePlayerStarted = true;
while (!otherVM.invoke(() -> ReconnectDUnitTest.isInitialized()).booleanValue()) {
try {
Thread.sleep(15);
} catch (InterruptedException ignor) {
fail("interrupted");
}
}
System.out.println("RoleAPlayerInitializer is done...");
closeCache();
}
};
}
void addReconnectListener() {
reconnectTries = 0; // reset the count for this listener
System.out.println("adding reconnect listener");
ReconnectListener reconlis = new ReconnectListener() {
@Override
public void reconnecting(InternalDistributedSystem oldSys) {
System.out.println("reconnect listener invoked");
reconnectTries++;
}
@Override
public void onReconnect(InternalDistributedSystem system1,
InternalDistributedSystem system2) {
System.out.println("reconnect listener onReconnect invoked " + system2);
cache = system2.getCache();
}
};
InternalDistributedSystem.addReconnectListener(reconlis);
}
private void waitTimeout() throws InterruptedException {
Thread.sleep(500);
}
public boolean forceDisconnect(VM vm) throws Exception {
SerializableCallable fd = new SerializableCallable("crash distributed system") {
@Override
public Object call() throws Exception {
// since the system will disconnect and attempt to reconnect
// a new system the old reference to DTC.system can cause
// trouble, so we first null it out.
nullSystem();
final Locator oldLocator = Locator.getLocator();
final DistributedSystem msys = cache.getDistributedSystem();
MembershipManagerHelper.crashDistributedSystem(msys);
if (oldLocator != null) {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return msys.isReconnecting() || msys.getReconnectedSystem() != null;
}
@Override
public String description() {
return "waiting for locator to start reconnecting: " + oldLocator;
}
};
GeodeAwaitility.await().untilAsserted(wc);
}
return true;
}
};
if (vm != null) {
return (Boolean) vm.invoke(fd);
} else {
return (Boolean) fd.call();
}
}
/**
* A listener whose init always throws an exception.
* Since init is always called during cache.xml parsing
* this listener is not able to be used from cache.xml.
*/
public static class ListenerWhoseInitMethodAlwaysThrows extends CacheListenerAdapter {
@Override
public void init(Properties props) {
throw new RuntimeException("Cause parsing to fail");
};
}
/**
* CacheListenerTriggeringForcedDisconnect crashes the distributed system when it is invoked for
* the first time.
* After that it ignores any notifications.
*/
public static class CacheListenerTriggeringForcedDisconnect extends CacheListenerAdapter
implements Declarable {
public static int crashCount = 0;
@Override
public void afterRegionCreate(final RegionEvent event) {
if (crashCount == 0) {
crashCount += 1;
// we crash the system in a different thread than the ReconnectThread
// to simulate receiving a ForcedDisconnect from the membership manager
// in the UDP reader thread
Thread t = new Thread("crash reconnecting system (ReconnectDUnitTest)") {
@Override
public void run() {
System.out.println("crashing distributed system");
GemFireCacheImpl cache = (GemFireCacheImpl) event.getRegion().getCache();
MembershipManagerHelper.crashDistributedSystem(cache.getDistributedSystem());
}
};
t.setDaemon(true);
t.start();
}
}
@Override
public void init(Properties props) {}
}
protected void addDSProps(Properties p) {}
}