blob: cb1d1d2f967588576651f1d211d19a18ed1c03ff [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.distributed.internal;
import java.net.InetAddress;
import java.util.Properties;
import junit.framework.Assert;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.admin.AdminDistributedSystem;
import com.gemstone.gemfire.admin.AdminDistributedSystemFactory;
import com.gemstone.gemfire.admin.Alert;
import com.gemstone.gemfire.admin.AlertLevel;
import com.gemstone.gemfire.admin.AlertListener;
import com.gemstone.gemfire.admin.DistributedSystemConfig;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionEvent;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager;
import com.gemstone.gemfire.distributed.internal.membership.jgroup.MembershipManagerHelper;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.tcp.Stub;
import com.gemstone.org.jgroups.protocols.pbcast.GMS;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* This class tests the functionality of the {@link
* DistributionManager} class.
*/
public class DistributionManagerDUnitTest extends DistributedTestCase {
private static final Logger logger = LogService.getLogger();
/**
* Clears the exceptionInThread flag in the given distribution
* manager.
*/
public static void clearExceptionInThreads(DistributionManager dm) {
dm.clearExceptionInThreads();
}
public DistributionManagerDUnitTest(String name) {
super(name);
}
@Override
public void setUp() throws Exception {
disconnectAllFromDS();
super.setUp();
}
//////// Test Methods
protected static class ItsOkayForMyClassNotToBeFound
extends SerialDistributionMessage {
public int getDSFID() {
return NO_FIXED_ID;
}
@Override
protected void process(DistributionManager dm) {
// We should never get here
}
};
public static DistributedSystem ds;
public void testGetDistributionVMType() {
DM dm = getSystem().getDistributionManager();
InternalDistributedMember ipaddr = dm.getId();
assertEquals(DistributionManager.NORMAL_DM_TYPE, ipaddr.getVmKind());
}
/**
* Send the distribution manager a message it can't deserialize
*/
public void testExceptionInThreads() throws InterruptedException {
DM dm = getSystem().getDistributionManager();
String p1 = "ItsOkayForMyClassNotToBeFound";
logger.info("<ExpectedException action=add>" + p1 + "</ExpectedException>");
DistributionMessage m = new ItsOkayForMyClassNotToBeFound();
dm.putOutgoing(m);
Thread.sleep(1 * 1000);
logger.info("<ExpectedException action=remove>" + p1 + "</ExpectedException>");
// assertTrue(dm.exceptionInThreads());
// dm.clearExceptionInThreads();
// assertTrue(!dm.exceptionInThreads());
}
public void _testGetDistributionManagerIds() {
int systemCount = 0;
for (int h = 0; h < Host.getHostCount(); h++) {
Host host = Host.getHost(h);
systemCount += host.getSystemCount();
}
DM dm = getSystem().getDistributionManager();
systemCount += 1;
assertEquals(systemCount, dm.getNormalDistributionManagerIds().size());
}
/**
* Demonstrate that a new UDP port is used when an attempt is made to
* reconnect using a shunned port
*/
public void testConnectAfterBeingShunned() {
InternalDistributedSystem sys = getSystem();
JGroupMembershipManager mgr = MembershipManagerHelper.getMembershipManager(sys);
InternalDistributedMember idm = mgr.getLocalMember();
System.setProperty("gemfire.jg-bind-port", ""+idm.getPort());
try {
sys.disconnect();
sys = getSystem();
mgr = MembershipManagerHelper.getMembershipManager(sys);
sys.disconnect();
InternalDistributedMember idm2 = mgr.getLocalMember();
getLogWriter().info("original ID=" + idm +
" and after connecting=" + idm2);
assertTrue("should not have used a different udp port",
idm.getPort() == idm2.getPort());
} finally {
System.getProperties().remove("gemfire.jg-bind-port");
}
}
/**
* Test the handling of "surprise members" in the membership manager.
* Create a DistributedSystem in this VM and then add a fake member to
* its surpriseMember set. Then ensure that it stays in the set when
* a new membership view arrives that doesn't contain it. Then wait
* until the member should be gone and force more view processing to have
* it scrubbed from the set.
**/
public void testSurpriseMemberHandling() {
VM vm0 = Host.getHost(0).getVM(0);
InternalDistributedSystem sys = getSystem();
JGroupMembershipManager mgr = MembershipManagerHelper.getMembershipManager(sys);
try {
InternalDistributedMember mbr = new InternalDistributedMember(
DistributedTestCase.getIPLiteral(), 12345);
// first make sure we can't add this as a surprise member (bug #44566)
// if the view number isn't being recorded correctly the test will pass but the
// functionality is broken
Assert.assertTrue("expected view ID to be greater than zero", mgr.getView().getViewNumber() > 0);
int oldViewId = mbr.getVmViewId();
mbr.setVmViewId((int)mgr.getView().getViewNumber()-1);
getLogWriter().info("current membership view is " + mgr.getView());
getLogWriter().info("created ID " + mbr + " with view ID " + mbr.getVmViewId());
sys.getLogWriter().info("<ExpectedException action=add>attempt to add old member</ExpectedException>");
sys.getLogWriter().info("<ExpectedException action=add>Removing shunned GemFire node</ExpectedException>");
try {
boolean accepted = mgr.addSurpriseMember(mbr, new Stub());
Assert.assertTrue("member with old ID was not rejected (bug #44566)", !accepted);
} finally {
sys.getLogWriter().info("<ExpectedException action=remove>attempt to add old member</ExpectedException>");
sys.getLogWriter().info("<ExpectedException action=remove>Removing shunned GemFire node</ExpectedException>");
}
mbr.setVmViewId(oldViewId);
// now forcibly add it as a surprise member and show that it is reaped
long gracePeriod = 5000;
long startTime = System.currentTimeMillis();
long birthTime = startTime - mgr.getSurpriseMemberTimeout() + gracePeriod;
MembershipManagerHelper.addSurpriseMember(sys, mbr, birthTime);
assertTrue("Member was not a surprise member", mgr.isSurpriseMember(mbr));
// force a real view change
SerializableRunnable connectDisconnect = new SerializableRunnable() {
public void run() {
getSystem().disconnect();
}
};
vm0.invoke(connectDisconnect);
if (birthTime < (System.currentTimeMillis() - mgr.getSurpriseMemberTimeout())) {
return; // machine is too busy and we didn't get enough CPU to perform more assertions
}
assertTrue("Member was incorrectly removed from surprise member set",
mgr.isSurpriseMember(mbr));
try {
Thread.sleep(gracePeriod);
}
catch (InterruptedException e) {
fail("test was interrupted");
}
vm0.invoke(connectDisconnect);
assertTrue("Member was not removed from surprise member set",
!mgr.isSurpriseMember(mbr));
}
catch (java.net.UnknownHostException e) {
fail("unable to resolve localhost - test needs some attention");
}
finally {
if (sys != null && sys.isConnected()) {
sys.disconnect();
}
}
}
/**
* Test the verifyMember interface of the membership manager
**/
public void testVerifyMember() {
VM memberVM = Host.getHost(0).getVM(1);
int port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
final String locators = getIPLiteral() + "[" + port1 + "]";
final Properties properties = new Properties();
properties.put(DistributionConfig.MCAST_PORT_NAME, "0");
properties.put(DistributionConfig.START_LOCATOR_NAME, locators);
properties.put(DistributionConfig.DISABLE_AUTO_RECONNECT_NAME, "true");
properties.put(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
system = (InternalDistributedSystem)DistributedSystem.connect(properties);
JGroupMembershipManager mgr = MembershipManagerHelper.getMembershipManager(system);
try {
properties.remove(DistributionConfig.START_LOCATOR_NAME);
properties.put(DistributionConfig.LOCATORS_NAME, locators);
properties.put(DistributionConfig.NAME_NAME, "John Doe");
DistributedMember id = (DistributedMember)memberVM.invoke(new SerializableCallable() {
public Object call() {
DistributedSystem system = DistributedSystem.connect(properties);
return system.getDistributedMember();
}
});
assertTrue(id.getName().equals("John Doe"));
getLogWriter().info("test is setting slow view casting hook");
// a new view won't be installed for 20 seconds
GMS.TEST_HOOK_SLOW_VIEW_CASTING=20;
// show we can reconnect even though the old ID for this member is still
// in the membership view. Disconnecting will shut down the old DistributedSystem
// in memberVM and while the old ID is still in the view
memberVM.invoke(new SerializableRunnable("disconnect and reconnect") {
public void run() {
DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
DistributedMember oldID = sys.getDistributedMember();
crashDistributedSystem(sys);
sys = DistributedSystem.connect(properties);
getLogWriter().info("initial view in new system is " +
MembershipManagerHelper.getMembershipManager(sys).getView());
// the old ID should be in the new view
if (MembershipManagerHelper.getMembershipManager(sys)
.memberExists((InternalDistributedMember)oldID)) {
// the old DistributedSystem should not be reachable
assertFalse(MembershipManagerHelper.getMembershipManager(sys)
.verifyMember(oldID, "old member has disconnected for this test"));
}
}
});
}
finally {
GMS.TEST_HOOK_SLOW_VIEW_CASTING = 0;
memberVM.invoke(new SerializableRunnable("disconnect") {
public void run() {
DistributedSystem sys = InternalDistributedSystem.getAnyInstance();
if (sys != null) {
sys.disconnect();
}
}
});
if (system != null && system.isConnected()) {
system.disconnect();
}
}
}
/**
* vm1 stores its cache in this static variable in
* testAckSeverAllertThreshold
*/
static Cache myCache;
/**
* Tests that a severe-level alert is generated if a member does not respond
* with an ack quickly enough. vm0 and vm1 create a region and set
* ack-severe-alert-threshold. vm1 has a cache listener in its
* region that sleeps when notified, forcing the operation to take longer
* than ack-wait-threshold + ack-severe-alert-threshold
*/
// DISABLED due to a high rate of failure in CI test runs. See internal
// ticket #52319
public void disabledtestAckSevereAlertThreshold() throws Exception {
disconnectAllFromDS();
Host host = Host.getHost(0);
// VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
// in order to set a small ack-wait-threshold, we have to remove the
// system property established by the dunit harness
String oldAckWait = (String)System.getProperties()
.remove("gemfire." + DistributionConfig.ACK_WAIT_THRESHOLD_NAME);
try {
final Properties props = getDistributedSystemProperties();
props.setProperty("mcast-port", "0");
props.setProperty(DistributionConfig.ACK_WAIT_THRESHOLD_NAME, "3");
props.setProperty(DistributionConfig.ACK_SEVERE_ALERT_THRESHOLD_NAME, "3");
props.setProperty(DistributionConfig.NAME_NAME, "putter");
getSystem(props);
Region rgn = (new RegionFactory())
.setScope(Scope.DISTRIBUTED_ACK)
.setEarlyAck(false)
.setDataPolicy(DataPolicy.REPLICATE)
.create("testRegion");
vm1.invoke(new SerializableRunnable("Connect to distributed system") {
public void run() {
props.setProperty(DistributionConfig.NAME_NAME, "sleeper");
getSystem(props);
addExpectedException("elapsed while waiting for replies");
RegionFactory rf = new RegionFactory();
Region r = rf.setScope(Scope.DISTRIBUTED_ACK)
.setDataPolicy(DataPolicy.REPLICATE)
.setEarlyAck(false)
.addCacheListener(getSleepingListener(false))
.create("testRegion");
myCache = r.getCache();
try {
createAlertListener();
}
catch (Exception e) {
throw new RuntimeException("failed to create alert listener", e);
}
}
});
// now we have two caches set up. vm1 has a listener that will sleep
// and cause the severe-alert threshold to be crossed
rgn.put("bomb", "pow!"); // this will hang until vm1 responds
rgn.getCache().close();
system.disconnect();
vm1.invoke(new SerializableRunnable("disconnect from ds") {
public void run() {
if (!myCache.isClosed()) {
if (system.isConnected()) {
system.disconnect();
}
myCache = null;
}
if (system.isConnected()) {
system.disconnect();
}
synchronized(alertGuard) {
assertTrue(alertReceived);
}
}
});
}
finally {
if (oldAckWait != null) {
System.setProperty("gemfire." + DistributionConfig.ACK_WAIT_THRESHOLD_NAME, oldAckWait);
}
}
}
static volatile boolean regionDestroyedInvoked;
static CacheListener getSleepingListener(final boolean playDead) {
regionDestroyedInvoked = false;
return new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
try {
if (playDead) {
MembershipManagerHelper.playDead(system);
}
Thread.sleep(15000);
}
catch (InterruptedException ie) {
fail("interrupted");
}
}
@Override
public void afterRegionDestroy(RegionEvent event) {
LogWriter logger = myCache.getLogger();
logger.info("afterRegionDestroyed invoked in sleeping listener");
logger.info("<ExpectedException action=remove>service failure</ExpectedException>");
logger.info("<ExpectedException action=remove>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
regionDestroyedInvoked = true;
}
};
}
static AdminDistributedSystem adminSystem;
static Object alertGuard = new Object();
static boolean alertReceived;
static void createAlertListener() throws Exception {
DistributedSystemConfig config =
AdminDistributedSystemFactory.defineDistributedSystem(system, null);
adminSystem =
AdminDistributedSystemFactory.getDistributedSystem(config);
adminSystem.setAlertLevel(AlertLevel.SEVERE);
adminSystem.addAlertListener(new AlertListener() {
public void alert(Alert alert) {
try {
logger.info("alert listener invoked for alert originating in " + alert.getConnectionName());
logger.info(" alert text = " + alert.getMessage());
logger.info(" systemMember = " + alert.getSystemMember());
}
catch (Exception e) {
logger.fatal("exception trying to use alert object", e);
}
synchronized(alertGuard) {
alertReceived = true;
}
}
});
adminSystem.connect();
assertTrue(adminSystem.waitToBeConnected(5 * 1000));
}
/**
* Tests that a sick member is kicked out
*/
public void testKickOutSickMember() throws Exception {
disconnectAllFromDS();
addExpectedException("10 seconds have elapsed while waiting");
Host host = Host.getHost(0);
// VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
// in order to set a small ack-wait-threshold, we have to remove the
// system property established by the dunit harness
String oldAckWait = (String)System.getProperties()
.remove("gemfire." + DistributionConfig.ACK_WAIT_THRESHOLD_NAME);
try {
final Properties props = getDistributedSystemProperties();
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); // loner
props.setProperty(DistributionConfig.ACK_WAIT_THRESHOLD_NAME, "5");
props.setProperty(DistributionConfig.ACK_SEVERE_ALERT_THRESHOLD_NAME, "5");
props.setProperty(DistributionConfig.NAME_NAME, "putter");
getSystem(props);
Region rgn = (new RegionFactory())
.setScope(Scope.DISTRIBUTED_ACK)
.setDataPolicy(DataPolicy.REPLICATE)
.create("testRegion");
system.getLogWriter().info("<ExpectedException action=add>sec have elapsed while waiting for replies</ExpectedException>");
vm1.invoke(new SerializableRunnable("Connect to distributed system") {
public void run() {
props.setProperty(DistributionConfig.NAME_NAME, "sleeper");
getSystem(props);
LogWriter log = system.getLogWriter();
log.info("<ExpectedException action=add>service failure</ExpectedException>");
log.info("<ExpectedException action=add>com.gemstone.gemfire.ForcedDisconnectException</ExpectedException>");
RegionFactory rf = new RegionFactory();
Region r = rf.setScope(Scope.DISTRIBUTED_ACK)
.setDataPolicy(DataPolicy.REPLICATE)
.addCacheListener(getSleepingListener(true))
.create("testRegion");
myCache = r.getCache();
}
});
// now we have two caches set up, each having an alert listener. Vm1
// also has a cache listener that will turn off its ability to respond
// to "are you dead" messages and then sleep
rgn.put("bomb", "pow!");
rgn.getCache().close();
system.getLogWriter().info("<ExpectedException action=remove>sec have elapsed while waiting for replies</ExpectedException>");
system.disconnect();
vm1.invoke(new SerializableRunnable("wait for forced disconnect") {
public void run() {
// wait a while for the DS to finish disconnecting
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return !system.isConnected();
}
public String description() {
return null;
}
};
// if this fails it means the sick member wasn't kicked out and something is wrong
DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
ev = new WaitCriterion() {
public boolean done() {
return myCache.isClosed();
}
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, 20 * 1000, 200, false);
if (!myCache.isClosed()) {
if (system.isConnected()) {
system.disconnect();
}
myCache = null;
throw new RuntimeException("Test Failed - vm1's cache is not closed");
}
if (system.isConnected()) {
system.disconnect();
throw new RuntimeException("Test Failed - vm1's system should have been disconnected");
}
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return regionDestroyedInvoked;
}
public String description() {
return "vm1's listener should have received afterRegionDestroyed notification";
}
};
DistributedTestCase.waitForCriterion(wc, 30 * 1000, 1000, true);
}
});
}
finally {
if (oldAckWait != null) {
System.setProperty("gemfire." + DistributionConfig.ACK_WAIT_THRESHOLD_NAME, oldAckWait);
}
}
}
/**
* test use of a bad bind-address for bug #32565
* @throws Exception
*/
public void testBadBindAddress() throws Exception {
disconnectAllFromDS();
final Properties props = getDistributedSystemProperties();
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); // loner
// use a valid address that's not proper for this machine
props.setProperty(DistributionConfig.BIND_ADDRESS_NAME, "www.yahoo.com");
props.setProperty(DistributionConfig.ACK_WAIT_THRESHOLD_NAME, "5");
props.setProperty(DistributionConfig.ACK_SEVERE_ALERT_THRESHOLD_NAME, "5");
try {
getSystem(props);
} catch (IllegalArgumentException e) {
getLogWriter().info("caught expected exception (1)", e);
}
// use an invalid address
props.setProperty(DistributionConfig.BIND_ADDRESS_NAME, "bruce.schuchardt");
try {
getSystem(props);
} catch (IllegalArgumentException e) {
getLogWriter().info("caught expected exception (2_", e);
}
// use a valid bind address
props.setProperty(DistributionConfig.BIND_ADDRESS_NAME, InetAddress.getLocalHost().getCanonicalHostName());
getSystem().disconnect();
}
}