blob: 3e3ab70800fd6441e7132d5bc5a21bcc7ded9192 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;
import static org.apache.geode.distributed.ConfigurationProperties.ACK_SEVERE_ALERT_THRESHOLD;
import static org.apache.geode.distributed.ConfigurationProperties.ACK_WAIT_THRESHOLD;
import static org.apache.geode.distributed.ConfigurationProperties.BIND_ADDRESS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.NAME;
import static org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
import static org.apache.geode.test.dunit.NetworkUtils.getIPLiteral;
import static org.apache.geode.test.dunit.Wait.pause;
import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.File;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.admin.AdminDistributedSystem;
import org.apache.geode.admin.AdminDistributedSystemFactory;
import org.apache.geode.admin.AlertLevel;
import org.apache.geode.admin.DistributedSystemConfig;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException;
import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
import org.apache.geode.distributed.internal.membership.api.MembershipView;
import org.apache.geode.distributed.internal.membership.gms.GMSMembership;
import org.apache.geode.internal.cache.StateFlushOperation;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.CacheTestCase;
import org.apache.geode.test.dunit.rules.DistributedErrorCollector;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.junit.categories.MembershipTest;
/**
* This class tests the functionality of the {@link ClusterDistributionManager} class.
*/
@Category({MembershipTest.class})
public class ClusterDistributionManagerDUnitTest extends CacheTestCase {
private static final Logger logger = LogService.getLogger();
private static volatile boolean regionDestroyedInvoked;
private static volatile Cache myCache;
private static volatile boolean alertReceived;
private transient ExecutorService executorService;
private VM locatorvm;
private VM vm1;
@Rule
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
@Rule
public DistributedErrorCollector errorCollector = new DistributedErrorCollector();
private int locatorPort;
@Before
public void setUp() {
executorService = Executors.newSingleThreadExecutor();
locatorvm = VM.getVM(0);
vm1 = VM.getVM(1);
Invoke.invokeInEveryVM(() -> System.setProperty("p2p.joinTimeout", "120000"));
final int port = locatorvm.invoke(() -> {
System.setProperty(BYPASS_DISCOVERY_PROPERTY, "true");
return Locator.startLocatorAndDS(0, new File(""), new Properties()).getPort();
});
vm1.invoke(() -> locatorPort = port);
locatorPort = port;
}
@Override
public Properties getDistributedSystemProperties() {
Properties result = super.getDistributedSystemProperties();
result.put(ConfigurationProperties.LOCATORS, "localhost[" + locatorPort + "]");
return result;
}
@After
public void tearDown() {
disconnectAllFromDS();
invokeInEveryVM(() -> {
regionDestroyedInvoked = false;
myCache = null;
alertReceived = false;
});
assertThat(executorService.shutdownNow()).isEmpty();
}
@Test
public void testGetDistributionVMType() {
DistributionManager dm = getSystem().getDistributionManager();
InternalDistributedMember member = dm.getId();
assertThat(ClusterDistributionManager.NORMAL_DM_TYPE).isEqualTo(member.getVmKind());
}
/**
* Demonstrate that a new UDP port is used when an attempt is made to reconnect using a shunned
* port
*/
@Test
public void testConnectAfterBeingShunned() {
InternalDistributedSystem system = getSystem();
Distribution membership = MembershipManagerHelper.getDistribution(system);
InternalDistributedMember memberBefore = membership.getLocalMember();
// TODO GMS needs to have a system property allowing the bind-port to be set
System.setProperty(GEMFIRE_PREFIX + "jg-bind-port", "" + memberBefore.getMembershipPort());
system.disconnect();
system = getSystem();
membership = MembershipManagerHelper.getDistribution(system);
system.disconnect();
InternalDistributedMember memberAfter = membership.getLocalMember();
assertThat(memberAfter.getMembershipPort()).isEqualTo(memberBefore.getMembershipPort());
}
/**
* Test the handling of "surprise members" in the membership manager. Create a DistributedSystem
* in this VM and then add a fake member to its surpriseMember set. Then ensure that it stays in
* the set when a new membership view arrives that doesn't contain it. Then wait until the member
* should be gone and force more view processing to have it scrubbed from the set.
**/
@Test
public void testSurpriseMemberHandling() {
System.setProperty(GEMFIRE_PREFIX + "surprise-member-timeout", "3000");
InternalDistributedSystem system = getSystem();
Distribution membershipManager =
MembershipManagerHelper.getDistribution(system);
InternalDistributedMember member = new InternalDistributedMember(getIPLiteral(), 12345);
// first make sure we can't add this as a surprise member (bug #44566)
// if the view number isn't being recorded correctly the test will pass but the
// functionality is broken
assertThat(membershipManager.getView().getViewId()).isGreaterThan(0);
int oldViewId = member.getVmViewId();
member.setVmViewId(membershipManager.getView().getViewId() - 1);
addIgnoredException("attempt to add old member");
addIgnoredException("Removing shunned GemFire node");
boolean accepted = membershipManager.addSurpriseMember(member);
assertThat(accepted).as("member with old ID was not rejected (bug #44566)").isFalse();
member.setVmViewId(oldViewId);
// now forcibly add it as a surprise member and show that it is reaped
long gracePeriod = 5000;
long startTime = System.currentTimeMillis();
long timeout = ((GMSMembership) membershipManager.getMembership()).getSurpriseMemberTimeout();
long birthTime = startTime - timeout + gracePeriod;
MembershipManagerHelper.addSurpriseMember(system, member, birthTime);
assertThat(membershipManager.isSurpriseMember(member)).as("Member was not a surprise member")
.isTrue();
await("waiting for member to be removed")
.until(() -> !membershipManager.isSurpriseMember(member));
}
@Test
public void shutdownMessageCausesListenerInvocation() {
final AtomicBoolean listenerInvoked = new AtomicBoolean();
vm1.invoke("join the cluster", () -> getSystem().getDistributedMember()); // lead member
system = getSystem(); // non-lead member
// this membership listener will be invoked when the shutdown message is received
system.getDistributionManager().addMembershipListener(new MembershipListener() {
@Override
public void memberDeparted(DistributionManager distributionManager,
InternalDistributedMember id, boolean crashed) {
assertThat(crashed).isFalse();
listenerInvoked.set(Boolean.TRUE);
}
});
final InternalDistributedMember memberID = system.getDistributedMember();
locatorvm.invoke("send a shutdown message", () -> {
final DistributionManager distributionManager =
((InternalDistributedSystem) Locator.getLocator().getDistributedSystem())
.getDistributionManager();
final ShutdownMessage shutdownMessage = new ShutdownMessage();
shutdownMessage.setRecipient(memberID);
shutdownMessage.setDistributionManagerId(distributionManager.getDistributionManagerId());
distributionManager.putOutgoing(shutdownMessage);
});
await().until(() -> listenerInvoked.get());
}
@Test
public void shutdownMessageCausesTargetMemberToLeaveStateFlushReplyProcessor() {
vm1.invoke("join the cluster", () -> getSystem().getDistributedMember()); // lead member
system = getSystem(); // non-lead member
DistributedMember targetId = locatorvm.invoke(() -> {
return Locator.getLocator().getDistributedSystem().getDistributedMember();
});
StateFlushOperation.StateFlushReplyProcessor stateFlushReplyProcessor =
new StateFlushOperation.StateFlushReplyProcessor(getSystem().getDistributionManager(),
new HashSet(), targetId);
system.getDistributionManager().addMembershipListener(stateFlushReplyProcessor);
final InternalDistributedMember memberID = system.getDistributedMember();
locatorvm.invoke("send a shutdown message", () -> {
final DistributionManager distributionManager =
((InternalDistributedSystem) Locator.getLocator().getDistributedSystem())
.getDistributionManager();
final ShutdownMessage shutdownMessage = new ShutdownMessage();
shutdownMessage.setRecipient(memberID);
shutdownMessage.setDistributionManagerId(distributionManager.getDistributionManagerId());
distributionManager.putOutgoing(shutdownMessage);
});
await().untilAsserted(
() -> assertThat(stateFlushReplyProcessor.getTargetMemberHasLeft()).isTrue());
}
/**
* Tests that a severe-level alert is generated if a member does not respond with an ack quickly
* enough. vm0 and vm1 create a region and set ack-severe-alert-threshold. vm1 has a cache
* listener in its region that sleeps when notified, forcing the operation to take longer than
* ack-wait-threshold + ack-severe-alert-threshold
*/
@Test
public void testAckSevereAlertThreshold() {
// in order to set a small ack-wait-threshold, we have to remove the
// system property established by the dunit harness
System.clearProperty(GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD);
Properties config = getDistributedSystemProperties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(ACK_WAIT_THRESHOLD, "3");
config.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "3");
config.setProperty(NAME, "putter");
getCache(config);
RegionFactory<String, String> regionFactory = getCache().createRegionFactory();
regionFactory.setScope(Scope.DISTRIBUTED_ACK);
regionFactory.setDataPolicy(DataPolicy.REPLICATE);
assertThat(getCache().isClosed()).isFalse();
Region<String, String> region = regionFactory.create("testRegion");
addIgnoredException("elapsed while waiting for replies");
vm1.invoke("Connect to distributed system", () -> {
config.setProperty(NAME, "sleeper");
getSystem(config);
RegionFactory<String, String> regionFactory2 = getCache().createRegionFactory();
regionFactory2.setScope(Scope.DISTRIBUTED_ACK);
regionFactory2.setDataPolicy(DataPolicy.REPLICATE);
regionFactory2.addCacheListener(getSleepingListener(false));
regionFactory2.create("testRegion");
myCache = getCache();
createAlertListener();
});
// now we have two caches set up. vm1 has a listener that will sleep
// and cause the severe-alert threshold to be crossed
region.put("bomb", "pow!"); // this will hang until vm1 responds
disconnectAllFromDS();
vm1.invoke(() -> {
assertThat(alertReceived).isTrue();
});
}
/**
* Tests that a sick member is kicked out
*/
@Test
public void testKickOutSickMember() {
addIgnoredException("10 seconds have elapsed while waiting");
addIgnoredException(MemberDisconnectedException.class);
// in order to set a small ack-wait-threshold, we have to remove the
// system property established by the dunit harness
System.clearProperty(GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD);
Properties config = getDistributedSystemProperties();
config.setProperty(MCAST_PORT, "0"); // loner
config.setProperty(ACK_WAIT_THRESHOLD, "5");
config.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "5");
config.setProperty(NAME, "putter");
getCache(config);
RegionFactory<String, String> regionFactory = getCache().createRegionFactory();
regionFactory.setScope(Scope.DISTRIBUTED_ACK);
regionFactory.setDataPolicy(DataPolicy.REPLICATE);
Region<String, String> region = regionFactory.create("testRegion");
addIgnoredException("sec have elapsed while waiting for replies");
vm1.invoke(new SerializableRunnable("Connect to distributed system") {
@Override
public void run() {
config.setProperty(NAME, "sleeper");
getCache(config);
addIgnoredException("service failure");
addIgnoredException(ForcedDisconnectException.class.getName());
RegionFactory<String, String> regionFactory2 = getCache().createRegionFactory();
regionFactory2.setScope(Scope.DISTRIBUTED_ACK);
regionFactory2.setDataPolicy(DataPolicy.REPLICATE);
regionFactory2.addCacheListener(getSleepingListener(true));
regionFactory2.create("testRegion");
myCache = getCache();
}
});
// now we have two caches set up, each having an alert listener. Vm1
// also has a cache listener that will turn off its ability to respond
// to "are you dead" messages and then sleep
region.put("bomb", "pow!");
disconnectFromDS();
vm1.invoke("wait for forced disconnect", () -> {
await("vm1's system should have been disconnected")
.untilAsserted(() -> assertThat(basicGetSystem().isConnected()).isFalse());
await("vm1's cache is not closed")
.untilAsserted(() -> assertThat(myCache.isClosed()).isTrue());
await("vm1's listener should have received afterRegionDestroyed notification")
.untilAsserted(() -> assertThat(regionDestroyedInvoked).isTrue());
});
}
/**
* test use of a bad bind-address for bug #32565
*/
@Test
public void testBadBindAddress() throws Exception {
Properties config = getDistributedSystemProperties();
config.setProperty(MCAST_PORT, "0"); // loner
config.setProperty(ACK_WAIT_THRESHOLD, "5");
config.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "5");
// use a valid address that's not proper for this machine
config.setProperty(BIND_ADDRESS, "www.yahoo.com");
assertThatThrownBy(() -> getSystem(config)).isInstanceOf(IllegalArgumentException.class);
// use an invalid address
config.setProperty(BIND_ADDRESS, "bruce.schuchardt");
assertThatThrownBy(() -> getSystem(config)).isInstanceOf(IllegalArgumentException.class);
// use a valid bind address
config.setProperty(BIND_ADDRESS, InetAddress.getLocalHost().getCanonicalHostName());
assertThatCode(this::getSystem).doesNotThrowAnyException();
}
/**
* install a new view and show that waitForViewInstallation works as expected
*/
@Test
public void testWaitForViewInstallation() {
InternalDistributedSystem system = getSystem();
ClusterDistributionManager dm = (ClusterDistributionManager) system.getDM();
MembershipView<InternalDistributedMember> view = dm.getDistribution().getView();
AtomicBoolean waitForViewInstallationDone = new AtomicBoolean();
executorService.submit(() -> {
try {
dm.waitForViewInstallation(view.getViewId() + 1);
waitForViewInstallationDone.set(true);
} catch (InterruptedException e) {
errorCollector.addError(e);
}
});
pause(2000);
vm1.invoke("create another member to initiate a new view", () -> {
getSystem();
});
await()
.untilAsserted(() -> assertThat(waitForViewInstallationDone.get()).isTrue());
}
/**
* show that waitForViewInstallation works as expected when distribution manager is closed
* while waiting for the latest membership view to install
*/
@Test
public void testWaitForViewInstallationDisconnectDS()
throws InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
InternalDistributedSystem system = getSystem();
ClusterDistributionManager dm = (ClusterDistributionManager) system.getDM();
MembershipView<InternalDistributedMember> view = dm.getDistribution().getView();
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
Future future = executorService.submit(() -> {
try {
cyclicBarrier.await(getTimeout().toMillis(), TimeUnit.MILLISECONDS);
dm.waitForViewInstallation(view.getViewId() + 1);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
errorCollector.addError(e);
}
});
cyclicBarrier.await(getTimeout().toMillis(), TimeUnit.MILLISECONDS);
system.disconnect();
future.get(getTimeout().toMillis(), TimeUnit.MILLISECONDS);
}
private CacheListener<String, String> getSleepingListener(final boolean playDead) {
regionDestroyedInvoked = false;
return new CacheListenerAdapter<String, String>() {
@Override
public void afterCreate(EntryEvent event) {
try {
if (playDead) {
MembershipManagerHelper.beSickMember(getSystemStatic());
MembershipManagerHelper.playDead(getSystemStatic());
}
Thread.sleep(15 * 1000);
} catch (InterruptedException ie) {
errorCollector.addError(ie);
}
}
@Override
public void afterRegionDestroy(RegionEvent event) {
logger.info("afterRegionDestroyed invoked in sleeping listener");
logger.info("<ExpectedException action=remove>service failure</ExpectedException>");
logger.info(
"<ExpectedException action=remove>org.apache.geode.ForcedDisconnectException</ExpectedException>");
regionDestroyedInvoked = true;
}
};
}
private void createAlertListener() throws Exception {
DistributedSystemConfig config =
AdminDistributedSystemFactory.defineDistributedSystem(getSystemStatic(), null);
AdminDistributedSystem adminSystem = AdminDistributedSystemFactory.getDistributedSystem(config);
adminSystem.setAlertLevel(AlertLevel.SEVERE);
adminSystem.addAlertListener(alert -> {
try {
logger.info("alert listener invoked for alert originating in " + alert.getConnectionName());
logger.info(" alert text = " + alert.getMessage());
logger.info(" systemMember = " + alert.getSystemMember());
} catch (Exception e) {
errorCollector.addError(e);
}
alertReceived = true;
});
adminSystem.connect();
assertThat(adminSystem.waitToBeConnected(5 * 1000)).isTrue();
}
}