blob: d2fa9d7e0d3ec3b23eb0befdec99009739d7ee99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed;
import static java.lang.Integer.parseInt;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static org.apache.geode.distributed.ConfigurationProperties.ACK_WAIT_THRESHOLD;
import static org.apache.geode.distributed.ConfigurationProperties.BIND_ADDRESS;
import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
import static org.apache.geode.distributed.ConfigurationProperties.DISABLE_TCP;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_ADDRESS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_FLOW_CONTROL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.MEMBERSHIP_PORT_RANGE;
import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
import static org.apache.geode.distributed.ConfigurationProperties.TCP_PORT;
import static org.apache.geode.distributed.internal.ClusterDistributionManager.NORMAL_DM_TYPE;
import static org.apache.geode.distributed.internal.ClusterDistributionManager.SERIAL_EXECUTOR;
import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_ACK_WAIT_THRESHOLD;
import static org.apache.geode.internal.AvailablePort.MULTICAST;
import static org.apache.geode.internal.AvailablePort.SOCKET;
import static org.apache.geode.internal.AvailablePort.getRandomAvailablePort;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortRange;
import static org.apache.geode.internal.net.SocketCreator.getLocalHost;
import static org.apache.geode.test.dunit.DistributedTestUtils.getDUnitLocatorPort;
import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Enumeration;
import java.util.Properties;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializable;
import org.apache.geode.GemFireConfigException;
import org.apache.geode.SerializationException;
import org.apache.geode.SystemConnectException;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.SerialDistributionMessage;
import org.apache.geode.distributed.internal.SizeableRunnable;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.MembershipTest;
/**
* Tests the functionality of the {@link DistributedSystem} class.
*/
@Category({MembershipTest.class})
public class DistributedSystemDUnitTest extends JUnit4DistributedTestCase {
private int mcastPort;
private int locatorPort;
private int tcpPort;
private int lowerBoundOfPortRange;
private int upperBoundOfPortRange;
@Before
public void before() throws Exception {
disconnectAllFromDS();
this.mcastPort = getRandomAvailablePort(MULTICAST);
this.locatorPort = getRandomAvailablePort(SOCKET);
this.tcpPort = getRandomAvailablePort(SOCKET);
int[] portRange = getRandomAvailableTCPPortRange(3, true);
this.lowerBoundOfPortRange = portRange[0];
this.upperBoundOfPortRange = portRange[portRange.length - 1];
}
@After
public void after() throws Exception {
disconnectAllFromDS();
}
/**
* ensure that waitForMemberDeparture correctly flushes the serial message queue for the given
* member
*/
@Test
public void testWaitForDeparture() throws Exception {
Properties config = new Properties();
config.put(LOCATORS, "");
config.put(START_LOCATOR, "localhost[" + this.locatorPort + "]");
config.put(DISABLE_TCP, "true");
InternalDistributedSystem system =
(InternalDistributedSystem) DistributedSystem.connect(config);
// construct a member ID that will represent a departed member
InternalDistributedMember member =
new InternalDistributedMember("localhost", 12345, "", "", NORMAL_DM_TYPE, null, null);
// schedule a message in order to create a queue for the fake member
ClusterDistributionManager distributionManager =
(ClusterDistributionManager) system.getDistributionManager();
final FakeMessage message = new FakeMessage(null);
distributionManager.getExecutor(SERIAL_EXECUTOR, member).execute(new SizeableRunnable(100) {
@Override
public void run() { // always throws NullPointerException
message.doAction(distributionManager, false);
}
@Override
public String toString() {
return "Processing fake message";
}
});
Assert.assertTrue("expected the serial queue to be flushed",
distributionManager.getMembershipManager().waitForDeparture(member));
Assert.assertTrue(message.processed);
}
/**
* Tests that we can get a DistributedSystem with the same configuration twice.
*/
@Test
public void testGetSameSystemTwice() {
Properties config = createLonerConfig();
// set a flow-control property for the test (bug 37562)
config.setProperty(MCAST_FLOW_CONTROL, "3000000,0.20,3000");
DistributedSystem system1 = DistributedSystem.connect(config);
DistributedSystem system2 = DistributedSystem.connect(config);
assertThat(system2).isSameAs(system1);
}
/**
* Tests that getting a <code>DistributedSystem</code> with a different configuration after one
* has already been obtained throws an exception.
*/
@Test
public void testGetDifferentSystem() {
Properties config = createLonerConfig();
config.setProperty(MCAST_FLOW_CONTROL, "3000000,0.20,3000");
DistributedSystem.connect(config);
config.setProperty(MCAST_ADDRESS, "224.0.0.1");
assertThatThrownBy(() -> DistributedSystem.connect(config))
.isInstanceOf(IllegalStateException.class);
}
/**
* Tests getting a system with a different configuration after another system has been closed.
*/
@Test
public void testGetDifferentSystemAfterClose() {
Properties config = createLonerConfig();
DistributedSystem system1 = DistributedSystem.connect(config);
system1.disconnect();
int time = DEFAULT_ACK_WAIT_THRESHOLD + 17;
config.put(ACK_WAIT_THRESHOLD, String.valueOf(time));
DistributedSystem system2 = DistributedSystem.connect(config);
system2.disconnect();
}
@Test
public void testGetProperties() {
int unusedPort = 0;
Properties config = createLonerConfig();
DistributedSystem system = DistributedSystem.connect(config);
assertThat(system.getProperties()).isNotSameAs(config);
assertThat(parseInt(system.getProperties().getProperty(MCAST_PORT))).isEqualTo(unusedPort);
system.disconnect();
assertThat(system.getProperties()).isNotSameAs(config);
assertThat(parseInt(system.getProperties().getProperty(MCAST_PORT))).isEqualTo(unusedPort);
}
@Test
public void testIsolatedDistributedSystem() throws Exception {
Properties config = createLonerConfig();
InternalDistributedSystem system = getSystem(config);
// make sure isolated distributed system can still create a cache and region
Cache cache = CacheFactory.create(system);
Region region = cache.createRegion(getUniqueName(), new AttributesFactory().create());
region.put("test", "value");
assertThat(region.get("test")).isEqualTo("value");
}
/**
* test the ability to set the port used to listen for tcp/ip connections
*/
@Test
public void testSpecificTcpPort() throws Exception {
Properties config = new Properties();
config.put(LOCATORS, "localhost[" + getDUnitLocatorPort() + "]");
config.setProperty(TCP_PORT, String.valueOf(this.tcpPort));
InternalDistributedSystem system = getSystem(config);
ClusterDistributionManager dm = (ClusterDistributionManager) system.getDistributionManager();
GMSMembershipManager mgr = (GMSMembershipManager) dm.getMembershipManager();
assertThat(mgr.getDirectChannelPort()).isEqualTo(this.tcpPort);
}
/**
* test that loopback cannot be used as a bind address when a locator w/o a bind address is being
* used
*/
@Test
public void testLoopbackNotAllowed() throws Exception {
// assert or assume that loopback is not null
InetAddress loopback = getLoopback();
assertThat(loopback).isNotNull();
String locators = getLocalHost().getHostName() + "[" + getDUnitLocatorPort() + "]";
Properties config = new Properties();
config.put(LOCATORS, locators);
config.setProperty(BIND_ADDRESS, loopback.getHostAddress());
getLogWriter().info("attempting to connect with " + loopback + " and locators=" + locators);
assertThatThrownBy(() -> getSystem(config)).isInstanceOf(GemFireConfigException.class);
}
@Test
public void testPortRange() throws Exception {
Properties config = new Properties();
config.put(LOCATORS, "localhost[" + getDUnitLocatorPort() + "]");
config.setProperty(MEMBERSHIP_PORT_RANGE,
this.lowerBoundOfPortRange + "-" + this.upperBoundOfPortRange);
InternalDistributedSystem system = getSystem(config);
ClusterDistributionManager dm = (ClusterDistributionManager) system.getDistributionManager();
InternalDistributedMember member = dm.getDistributionManagerId();
verifyMembershipPortsInRange(member, this.lowerBoundOfPortRange, this.upperBoundOfPortRange);
}
@Test
public void testConflictingUDPPort() {
Properties config = new Properties();
config.setProperty(MCAST_PORT, String.valueOf(this.mcastPort));
config.setProperty(START_LOCATOR, "localhost[" + this.locatorPort + "]");
config.setProperty(MEMBERSHIP_PORT_RANGE,
this.lowerBoundOfPortRange + "-" + this.upperBoundOfPortRange);
DistributedSystem.connect(config);
IgnoredException.addIgnoredException("SystemConnectException", VM.getVM(1));
VM.getVM(1).invoke(() -> {
String locators = (String) config.remove(START_LOCATOR);
config.put(LOCATORS, locators);
assertThatThrownBy(() -> DistributedSystem.connect(config))
.isInstanceOfAny(GemFireConfigException.class, SystemConnectException.class);
});
}
@Test
public void memberShouldWaitUntilAStartupResponseIsReceived() {
VM vm0 = VM.getVM(0);
VM vm1 = VM.getVM(1);
Properties properties = new Properties();
properties.setProperty(REDUNDANCY_ZONE, "testzone");
vm0.invoke(() -> {
getSystem(properties);
});
vm1.invoke(() -> {
InternalDistributedSystem system = getSystem(properties);
// Redundancy zone is part of the data that is sent with the startup response
// If we receive a startup response, we should see that vm0 is in the same
// redundancy zone as the current member.
assertThat(
system.getDistributionManager().getMembersInSameZone(system.getDistributedMember()))
.hasSize(2);
});
}
@Test(timeout = 600_000)
public void failedMessageReceivedBeforeStartupShouldNotDeadlock() {
VM vm0 = VM.getVM(0);
VM vm1 = VM.getVM(1);
// Install a membership listener which will send a message to
// any new member that joins. The message will fail to deserialize, triggering
// a failure reply
vm0.invoke(() -> {
InternalDistributedSystem system = getSystem();
DistributionManager dm = system.getDM();
dm.addMembershipListener(new MembershipListener() {
@Override
public void memberJoined(DistributionManager distributionManager,
InternalDistributedMember id) {
FunctionService.onMember(id).execute(new FailDeserializationFunction());
}
});
});
vm1.invoke(() -> {
IgnoredException.addIgnoredException(SerializationException.class);
// Join the the system. This will trigger the above membership listener. If
// the failed serialization causes a deadlock, this method will hang
getSystem();
});
}
/**
* Tests that configuring a distributed system with a cache-xml-file of "" does not initialize a
* cache.
*
* Verifies: "Allow the cache-xml-file specification to be an empty string"
*
* @since GemFire 4.0
*/
@Test
public void testEmptyCacheXmlFile() throws Exception {
Properties config = createLonerConfig();
config.setProperty(CACHE_XML_FILE, "");
DistributedSystem system = DistributedSystem.connect(config);
assertThatThrownBy(() -> CacheFactory.getInstance(system)).isInstanceOf(CancelException.class);
// now make sure we can create the cache
Cache cache = CacheFactory.create(system);
assertThat(cache).isNotNull();
assertThat(cache.isClosed()).isFalse();
}
private Properties createLonerConfig() {
Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");
return config;
}
private void verifyMembershipPortsInRange(final InternalDistributedMember member,
final int lowerBound, final int upperBound) {
assertThat(member.getPort()).isGreaterThanOrEqualTo(lowerBound);
assertThat(member.getPort()).isLessThanOrEqualTo(upperBound);
assertThat(member.getDirectChannelPort()).isGreaterThanOrEqualTo(lowerBound);
assertThat(member.getDirectChannelPort()).isLessThanOrEqualTo(upperBound);
}
private InetAddress getLoopback() throws SocketException, UnknownHostException {
for (Enumeration<NetworkInterface> networkInterfaceEnumeration =
getNetworkInterfaces(); networkInterfaceEnumeration.hasMoreElements();) {
NetworkInterface networkInterface = networkInterfaceEnumeration.nextElement();
for (Enumeration<InetAddress> addressEnum = networkInterface.getInetAddresses(); addressEnum
.hasMoreElements();) {
InetAddress address = addressEnum.nextElement();
Class theClass =
getLocalHost() instanceof Inet4Address ? Inet4Address.class : Inet6Address.class;
if (address.isLoopbackAddress() && address.getClass().isAssignableFrom(theClass)) {
return address;
}
}
}
return null;
}
/**
* What is the point of this FakeMessage? Member variables are unused and doAction actually throws
* NullPointerException.
*/
private static class FakeMessage extends SerialDistributionMessage {
private volatile boolean[] blocked; // always null
private volatile boolean processed; // unused
FakeMessage(boolean[] blocked) { // null is always passed in
this.blocked = blocked;
}
public void doAction(ClusterDistributionManager dm, boolean block) {
this.processed = true;
if (block) {
synchronized (this.blocked) { // throws NullPointerException here
this.blocked[0] = true;
this.blocked.notify();
try {
this.blocked.wait(60000);
} catch (InterruptedException e) {
}
}
}
}
@Override
public int getDSFID() {
return 0; // never serialized
}
@Override
protected void process(ClusterDistributionManager dm) {
// this is never called
}
@Override
public String toString() {
return "FakeMessage(blocking=" + (this.blocked != null) + ")";
}
}
/**
* A function that cannot be deserialized, used for failure handling
*/
public static class FailDeserializationFunction
implements org.apache.geode.cache.execute.Function,
DataSerializable {
@Override
public void execute(FunctionContext context) {
}
@Override
public void toData(DataOutput out) throws IOException {
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
throw new ClassNotFoundException("Fake class not found");
}
}
}