blob: d2c171cdfc3591bf453f3017cecf08f9fb081fb3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache30;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.Assert.assertEquals;
import static org.apache.geode.test.dunit.Assert.assertFalse;
import static org.apache.geode.test.dunit.Assert.assertNotNull;
import static org.apache.geode.test.dunit.Assert.assertNull;
import static org.apache.geode.test.dunit.Assert.assertTrue;
import static org.apache.geode.test.dunit.Assert.fail;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.Statistics;
import org.apache.geode.StatisticsType;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.DurableClientAttributes;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.tier.InternalClientMembership;
import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.management.membership.ClientMembership;
import org.apache.geode.management.membership.ClientMembershipEvent;
import org.apache.geode.management.membership.ClientMembershipListener;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.junit.categories.ClientServerTest;
/**
* Tests the ClientMembership API including ClientMembershipListener.
*
* @since GemFire 4.2.1
*/
@Category({ClientServerTest.class})
public class ClientMembershipDUnitTest extends ClientServerTestCase {
protected static final boolean CLIENT = true;
protected static final boolean SERVER = false;
protected static final int JOINED = 0;
protected static final int LEFT = 1;
protected static final int CRASHED = 2;
private static Properties properties;
ServerLocation serverLocation = new ServerLocation("127.0.0.1", 0);
@Override
public final void postTearDownCacheTestCase() throws Exception {
Invoke.invokeInEveryVM((ClientMembershipDUnitTest::cleanup));
}
public static void cleanup() {
properties = null;
InternalClientMembership.unregisterAllListeners();
}
private void waitForAcceptsInProgressToBe(final int target) throws Exception {
await().timeout(300, TimeUnit.SECONDS).until(() -> {
int actual = getAcceptsInProgress();
return actual == getAcceptsInProgress();
});
}
protected int getAcceptsInProgress() {
DistributedSystem distributedSystem = getCache().getDistributedSystem();
StatisticsType st = distributedSystem.findType("CacheServerStats");
Statistics[] s = distributedSystem.findStatisticsByType(st);
return s[0].getInt("acceptsInProgress");
}
protected static Socket meanSocket;
/**
* test that a server times out waiting for a handshake that never arrives.
*/
@Test
public void testConnectionTimeout() throws Exception {
IgnoredException.addIgnoredException("failed accepting client connection");
final Host host = Host.getHost(0);
final String hostName = NetworkUtils.getServerHostName(host);
final VM vm0 = host.getVM(0);
System.setProperty(AcceptorImpl.ACCEPT_TIMEOUT_PROPERTY_NAME, "1000");
try {
final int port = startBridgeServer(0);
// AsyncInvocation ai = null;
try {
assertTrue(port != 0);
SerializableRunnable createMeanSocket =
new CacheSerializableRunnable("Connect to server with socket") {
@Override
public void run2() throws CacheException {
getCache(); // create a cache so we have stats
System.out.println("connecting to cache server with socket");
try {
InetAddress addr = InetAddress.getByName(hostName);
meanSocket = new Socket(addr, port);
} catch (Exception e) {
throw new RuntimeException("Test failed to connect or was interrupted", e);
}
}
};
SerializableRunnable closeMeanSocket = new CacheSerializableRunnable("close mean socket") {
@Override
public void run2() throws CacheException {
System.out.println("closing mean socket");
try {
meanSocket.close();
} catch (IOException ignore) {
}
}
};
assertEquals(0, getAcceptsInProgress());
System.out.println("creating mean socket");
vm0.invoke("Connect to server with socket", () -> createMeanSocket);
try {
System.out.println("waiting to see it connect on server");
waitForAcceptsInProgressToBe(1);
} finally {
System.out.println("closing mean socket");
vm0.invoke("close mean socket", () -> closeMeanSocket);
}
System.out.println("waiting to see accept to go away on server");
waitForAcceptsInProgressToBe(0);
// now try it without a close. Server should timeout the mean connect
System.out.println("creating mean socket 2");
vm0.invoke("Connect to server with socket", () -> createMeanSocket);
try {
System.out.println("waiting to see it connect on server 2");
waitForAcceptsInProgressToBe(1);
System.out.println("waiting to see accept to go away on server without us closing");
waitForAcceptsInProgressToBe(0);
} finally {
System.out.println("closing mean socket 2");
vm0.invoke("close mean socket", () -> closeMeanSocket);
}
} finally {
stopBridgeServers(getCache());
}
} finally {
System.getProperties().remove(AcceptorImpl.ACCEPT_TIMEOUT_PROPERTY_NAME);
}
}
@Test
public void testSynchronousEvents() throws Exception {
getSystem();
InternalClientMembership.setForceSynchronous(true);
try {
doTestBasicEvents();
} finally {
InternalClientMembership.setForceSynchronous(false);
}
}
/**
* Tests event notification methods on ClientMembership.
*/
@Test
public void testBasicEvents() throws Exception {
getSystem();
doTestBasicEvents();
}
public void doTestBasicEvents() throws Exception {
final boolean[] fired = new boolean[3];
final DistributedMember[] member = new DistributedMember[3];
final String[] memberId = new String[3];
final boolean[] isClient = new boolean[3];
ClientMembershipListener listener = new ClientMembershipListener() {
@Override
public void memberJoined(ClientMembershipEvent event) {
fired[JOINED] = true;
member[JOINED] = event.getMember();
memberId[JOINED] = event.getMemberId();
isClient[JOINED] = event.isClient();
}
@Override
public void memberLeft(ClientMembershipEvent event) {
fired[LEFT] = true;
member[LEFT] = event.getMember();
memberId[LEFT] = event.getMemberId();
isClient[LEFT] = event.isClient();
}
@Override
public void memberCrashed(ClientMembershipEvent event) {
fired[CRASHED] = true;
member[CRASHED] = event.getMember();
memberId[CRASHED] = event.getMemberId();
isClient[CRASHED] = event.isClient();
}
};
ClientMembership.registerClientMembershipListener(listener);
// test JOIN for server
InternalClientMembership.notifyServerJoined(serverLocation);
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[JOINED];
});
assertTrue(fired[JOINED]);
assertNotNull(member[JOINED]);
assertFalse(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
// test JOIN for client
DistributedMember clientJoined = new TestDistributedMember("clientJoined");
InternalClientMembership.notifyClientJoined(clientJoined);
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[JOINED];
});
assertTrue(fired[JOINED]);
assertEquals(clientJoined, member[JOINED]);
assertEquals(clientJoined.getId(), memberId[JOINED]);
assertTrue(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
// test LEFT for server
InternalClientMembership.notifyServerLeft(serverLocation);
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[LEFT];
});
assertFalse(fired[JOINED]);
assertNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
assertTrue(fired[LEFT]);
assertNotNull(member[LEFT]);
assertFalse(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
// test LEFT for client
DistributedMember clientLeft = new TestDistributedMember("clientLeft");
InternalClientMembership.notifyClientLeft(clientLeft);
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[LEFT];
});
assertFalse(fired[JOINED]);
assertNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
assertTrue(fired[LEFT]);
assertEquals(clientLeft, member[LEFT]);
assertEquals(clientLeft.getId(), memberId[LEFT]);
assertTrue(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
// test CRASHED for server
InternalClientMembership.notifyServerCrashed(serverLocation);
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[CRASHED];
});
assertFalse(fired[JOINED]);
assertNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertTrue(fired[CRASHED]);
assertNotNull(member[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
// test CRASHED for client
DistributedMember clientCrashed = new TestDistributedMember("clientCrashed");
InternalClientMembership.notifyClientCrashed(clientCrashed);
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[CRASHED];
});
assertFalse(fired[JOINED]);
assertNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertTrue(fired[CRASHED]);
assertEquals(clientCrashed, member[CRASHED]);
assertEquals(clientCrashed.getId(), memberId[CRASHED]);
assertTrue(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
}
/**
* Resets all elements of arrays used for listener testing. Boolean values are reset to false.
* String values are reset to null.
*/
private void resetArraysForTesting(boolean[] fired, DistributedMember[] member, String[] memberId,
boolean[] isClient) {
for (int i = 0; i < fired.length; i++) {
fired[i] = false;
member[i] = null;
memberId[i] = null;
isClient[i] = false;
}
}
/**
* Tests unregisterClientMembershipListener to ensure that no further events are delivered to
* unregistered listeners.
*/
@Test
public void testUnregisterClientMembershipListener() throws Exception {
final boolean[] fired = new boolean[1];
final DistributedMember[] member = new DistributedMember[1];
final String[] memberId = new String[1];
final boolean[] isClient = new boolean[1];
getSystem();
ClientMembershipListener listener = new ClientMembershipListener() {
@Override
public void memberJoined(ClientMembershipEvent event) {
fired[0] = true;
member[0] = event.getMember();
memberId[0] = event.getMemberId();
isClient[0] = event.isClient();
}
@Override
public void memberLeft(ClientMembershipEvent event) {}
@Override
public void memberCrashed(ClientMembershipEvent event) {}
};
ClientMembership.registerClientMembershipListener(listener);
// fire event to make sure listener is registered
DistributedMember clientJoined = new TestDistributedMember("clientJoined");
InternalClientMembership.notifyClientJoined(clientJoined);
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[JOINED];
});
assertTrue(fired[0]);
assertEquals(clientJoined, member[0]);
assertEquals(clientJoined.getId(), memberId[0]);
assertTrue(isClient[0]);
resetArraysForTesting(fired, member, memberId, isClient);
assertFalse(fired[0]);
assertNull(memberId[0]);
assertFalse(isClient[0]);
// unregister and verify listener is not notified
ClientMembership.unregisterClientMembershipListener(listener);
InternalClientMembership.notifyClientJoined(clientJoined);
await().until(() -> {
return true;
});
assertFalse(fired[0]);
assertNull(member[0]);
assertNull(memberId[0]);
assertFalse(isClient[0]);
}
@Test
public void testMultipleListeners() throws Exception {
final int NUM_LISTENERS = 4;
final boolean[] fired = new boolean[NUM_LISTENERS];
final DistributedMember[] member = new DistributedMember[NUM_LISTENERS];
final String[] memberId = new String[NUM_LISTENERS];
final boolean[] isClient = new boolean[NUM_LISTENERS];
getSystem();
final ClientMembershipListener[] listeners = new ClientMembershipListener[NUM_LISTENERS];
for (int i = 0; i < NUM_LISTENERS; i++) {
final int whichListener = i;
listeners[i] = new ClientMembershipListener() {
@Override
public void memberJoined(ClientMembershipEvent event) {
assertFalse(fired[whichListener]);
assertNull(member[whichListener]);
assertNull(memberId[whichListener]);
assertFalse(isClient[whichListener]);
fired[whichListener] = true;
member[whichListener] = event.getMember();
memberId[whichListener] = event.getMemberId();
isClient[whichListener] = event.isClient();
}
@Override
public void memberLeft(ClientMembershipEvent event) {}
@Override
public void memberCrashed(ClientMembershipEvent event) {}
};
}
final DistributedMember clientJoined = new TestDistributedMember("clientJoined");
InternalClientMembership.notifyClientJoined(clientJoined);
for (int i = 0; i < NUM_LISTENERS; i++) {
synchronized (listeners[i]) {
listeners[i].wait(20);
}
assertFalse(fired[i]);
assertNull(member[i]);
assertNull(memberId[i]);
assertFalse(isClient[i]);
}
// attempt to register same listener twice... 2nd reg should be ignored
// failure would cause an assertion failure in memberJoined impl
ClientMembership.registerClientMembershipListener(listeners[0]);
ClientMembership.registerClientMembershipListener(listeners[0]);
ClientMembershipListener[] registeredListeners =
ClientMembership.getClientMembershipListeners();
assertEquals(1, registeredListeners.length);
assertEquals(listeners[0], registeredListeners[0]);
ClientMembership.registerClientMembershipListener(listeners[1]);
registeredListeners = ClientMembership.getClientMembershipListeners();
assertEquals(2, registeredListeners.length);
assertEquals(listeners[0], registeredListeners[0]);
assertEquals(listeners[1], registeredListeners[1]);
InternalClientMembership.notifyClientJoined(clientJoined);
synchronized (listeners[1]) {
if (!fired[1]) {
listeners[1].wait(2000);
}
}
for (int i = 0; i < NUM_LISTENERS; i++) {
if (i < 2) {
assertTrue(fired[i]);
assertEquals(clientJoined, member[i]);
assertEquals(clientJoined.getId(), memberId[i]);
assertTrue(isClient[i]);
} else {
assertFalse(fired[i]);
assertNull(member[i]);
assertNull(memberId[i]);
assertFalse(isClient[i]);
}
}
resetArraysForTesting(fired, member, memberId, isClient);
ClientMembership.unregisterClientMembershipListener(listeners[0]);
registeredListeners = ClientMembership.getClientMembershipListeners();
assertEquals(1, registeredListeners.length);
assertEquals(listeners[1], registeredListeners[0]);
InternalClientMembership.notifyClientJoined(clientJoined);
synchronized (listeners[1]) {
if (!fired[1]) {
listeners[1].wait(2000);
}
}
for (int i = 0; i < NUM_LISTENERS; i++) {
if (i == 1) {
assertTrue(fired[i]);
assertEquals(clientJoined, member[i]);
assertEquals(clientJoined.getId(), memberId[i]);
assertTrue(isClient[i]);
} else {
assertFalse(fired[i]);
assertNull(member[i]);
assertNull(memberId[i]);
assertFalse(isClient[i]);
}
}
resetArraysForTesting(fired, member, memberId, isClient);
ClientMembership.registerClientMembershipListener(listeners[2]);
ClientMembership.registerClientMembershipListener(listeners[3]);
registeredListeners = ClientMembership.getClientMembershipListeners();
assertEquals(3, registeredListeners.length);
assertEquals(listeners[1], registeredListeners[0]);
assertEquals(listeners[2], registeredListeners[1]);
assertEquals(listeners[3], registeredListeners[2]);
InternalClientMembership.notifyClientJoined(clientJoined);
synchronized (listeners[3]) {
if (!fired[3]) {
listeners[3].wait(2000);
}
}
for (int i = 0; i < NUM_LISTENERS; i++) {
if (i != 0) {
assertTrue(fired[i]);
assertEquals(clientJoined, member[i]);
assertEquals(clientJoined.getId(), memberId[i]);
assertTrue(isClient[i]);
} else {
assertFalse(fired[i]);
assertNull(member[i]);
assertNull(memberId[i]);
assertFalse(isClient[i]);
}
}
resetArraysForTesting(fired, member, memberId, isClient);
ClientMembership.registerClientMembershipListener(listeners[0]);
registeredListeners = ClientMembership.getClientMembershipListeners();
assertEquals(4, registeredListeners.length);
assertEquals(listeners[1], registeredListeners[0]);
assertEquals(listeners[2], registeredListeners[1]);
assertEquals(listeners[3], registeredListeners[2]);
assertEquals(listeners[0], registeredListeners[3]);
InternalClientMembership.notifyClientJoined(clientJoined);
synchronized (listeners[0]) {
if (!fired[0]) {
listeners[0].wait(2000);
}
}
for (int i = 0; i < NUM_LISTENERS; i++) {
assertTrue(fired[i]);
assertEquals(clientJoined, member[i]);
assertEquals(clientJoined.getId(), memberId[i]);
assertTrue(isClient[i]);
}
resetArraysForTesting(fired, member, memberId, isClient);
ClientMembership.unregisterClientMembershipListener(listeners[3]);
registeredListeners = ClientMembership.getClientMembershipListeners();
assertEquals(3, registeredListeners.length);
assertEquals(listeners[1], registeredListeners[0]);
assertEquals(listeners[2], registeredListeners[1]);
assertEquals(listeners[0], registeredListeners[2]);
InternalClientMembership.notifyClientJoined(clientJoined);
synchronized (listeners[0]) {
if (!fired[0]) {
listeners[0].wait(2000);
}
}
for (int i = 0; i < NUM_LISTENERS; i++) {
if (i < 3) {
assertTrue(fired[i]);
assertEquals(clientJoined, member[i]);
assertEquals(clientJoined.getId(), memberId[i]);
assertTrue(isClient[i]);
} else {
assertFalse(fired[i]);
assertNull(member[i]);
assertNull(memberId[i]);
assertFalse(isClient[i]);
}
}
resetArraysForTesting(fired, member, memberId, isClient);
ClientMembership.unregisterClientMembershipListener(listeners[2]);
registeredListeners = ClientMembership.getClientMembershipListeners();
assertEquals(2, registeredListeners.length);
assertEquals(listeners[1], registeredListeners[0]);
assertEquals(listeners[0], registeredListeners[1]);
InternalClientMembership.notifyClientJoined(clientJoined);
synchronized (listeners[0]) {
if (!fired[0]) {
listeners[0].wait(2000);
}
}
for (int i = 0; i < NUM_LISTENERS; i++) {
if (i < 2) {
assertTrue(fired[i]);
assertEquals(clientJoined, member[i]);
assertEquals(clientJoined.getId(), memberId[i]);
assertTrue(isClient[i]);
} else {
assertFalse(fired[i]);
assertNull(member[i]);
assertNull(memberId[i]);
assertFalse(isClient[i]);
}
}
resetArraysForTesting(fired, member, memberId, isClient);
ClientMembership.unregisterClientMembershipListener(listeners[1]);
ClientMembership.unregisterClientMembershipListener(listeners[0]);
registeredListeners = ClientMembership.getClientMembershipListeners();
assertEquals(0, registeredListeners.length);
InternalClientMembership.notifyClientJoined(clientJoined);
for (int i = 0; i < NUM_LISTENERS; i++) {
synchronized (listeners[i]) {
listeners[i].wait(20);
}
assertFalse(fired[i]);
assertNull(member[i]);
assertNull(memberId[i]);
assertFalse(isClient[i]);
}
resetArraysForTesting(fired, member, memberId, isClient);
ClientMembership.registerClientMembershipListener(listeners[1]);
registeredListeners = ClientMembership.getClientMembershipListeners();
assertEquals(1, registeredListeners.length);
assertEquals(listeners[1], registeredListeners[0]);
InternalClientMembership.notifyClientJoined(clientJoined);
synchronized (listeners[1]) {
if (!fired[1]) {
listeners[1].wait(2000);
}
}
for (int i = 0; i < NUM_LISTENERS; i++) {
if (i == 1) {
assertTrue(fired[i]);
assertEquals(clientJoined, member[i]);
assertEquals(clientJoined.getId(), memberId[i]);
assertTrue(isClient[i]);
} else {
assertFalse(fired[i]);
assertNull(member[i]);
assertNull(memberId[i]);
assertFalse(isClient[i]);
}
}
}
protected static int testClientMembershipEventsInClient_port;
private static int getTestClientMembershipEventsInClient_port() {
return testClientMembershipEventsInClient_port;
}
/**
* Tests notification of events in client process. Bridge clients detect server joins when the
* client connects to the server. If the server crashes or departs gracefully, the client will
* detect this as a crash.
*/
@Test
public void testClientMembershipEventsInClient() throws Exception {
properties = null;
getSystem();
IgnoredException.addIgnoredException("IOException");
final boolean[] fired = new boolean[3];
final DistributedMember[] member = new DistributedMember[3];
final String[] memberId = new String[3];
final boolean[] isClient = new boolean[3];
// create and register ClientMembershipListener in controller vm...
ClientMembershipListener listener = new ClientMembershipListener() {
@Override
public void memberJoined(ClientMembershipEvent event) {
System.out.println("[testClientMembershipEventsInClient] memberJoined: " + event);
fired[JOINED] = true;
member[JOINED] = event.getMember();
memberId[JOINED] = event.getMemberId();
isClient[JOINED] = event.isClient();
}
@Override
public void memberLeft(ClientMembershipEvent event) {
System.out.println("[testClientMembershipEventsInClient] memberLeft: " + event);
}
@Override
public void memberCrashed(ClientMembershipEvent event) {
System.out.println("[testClientMembershipEventsInClient] memberCrashed: " + event);
fired[CRASHED] = true;
member[CRASHED] = event.getMember();
memberId[CRASHED] = event.getMemberId();
isClient[CRASHED] = event.isClient();
}
};
ClientMembership.registerClientMembershipListener(listener);
final VM vm0 = Host.getHost(0).getVM(0);
final String name = getUniqueName();
final int[] ports = new int[1];
// create BridgeServer in vm0...
vm0.invoke("create cache server", () -> {
try {
System.out.println("[testClientMembershipEventsInClient] Create BridgeServer");
getSystem();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
Region region = createRegion(name, factory.create());
assertNotNull(region);
assertNotNull(getRootRegion().getSubregion(name));
testClientMembershipEventsInClient_port = startBridgeServer(0);
} catch (IOException e) {
getSystem().getLogWriter().fine(new Exception(e));
fail("Failed to start CacheServer: " + e.getMessage());
}
});
// gather details for later creation of ConnectionPool...
ports[0] = vm0.invoke("getTestClientMembershipEventsInClient_port",
ClientMembershipDUnitTest::getTestClientMembershipEventsInClient_port);
assertTrue(ports[0] != 0);
DistributedMember serverMember = vm0.invoke("get distributed member",
() -> getSystem().getDistributedMember());
String serverMemberId = serverMember.toString();
System.out.println("[testClientMembershipEventsInClient] ports[0]=" + ports[0]);
System.out.println("[testClientMembershipEventsInClient] serverMember=" + serverMember);
System.out.println("[testClientMembershipEventsInClient] serverMemberId=" + serverMemberId);
assertFalse(fired[JOINED]);
assertNull(member[JOINED]);
assertNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(member[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(member[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
// sanity check...
System.out.println("[testClientMembershipEventsInClient] sanity check");
InternalClientMembership.notifyServerJoined(serverLocation);
await().timeout(300, SECONDS)
.until(() -> fired[JOINED] || fired[CRASHED]);
assertTrue(fired[JOINED]);
assertNotNull(member[JOINED]);
assertFalse(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(member[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(member[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
// create bridge client in controller vm...
System.out.println("[testClientMembershipEventsInClient] create bridge client");
Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");
config.setProperty(ENABLE_NETWORK_PARTITION_DETECTION, "false");
getSystem(config);
try {
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
ClientServerTestCase.configureConnectionPool(factory,
NetworkUtils.getServerHostName(Host.getHost(0)), ports, true, -1, -1, null);
createRegion(name, factory.create());
assertNotNull(getRootRegion().getSubregion(name));
} catch (CacheException ex) {
Assert.fail("While creating Region on Edge", ex);
}
await().timeout(300, SECONDS)
.until(() -> fired[JOINED] || fired[CRASHED]);
System.out.println("[testClientMembershipEventsInClient] assert client detected server join");
// first check the getCurrentServers() result
ClientCache clientCache = (ClientCache) getCache();
Set<InetSocketAddress> servers = clientCache.getCurrentServers();
assertTrue(!servers.isEmpty());
// now check listener results
assertTrue(fired[JOINED]);
assertNotNull(member[JOINED]);
assertNotNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(member[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(member[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
vm0.invoke("Stop BridgeServer", () -> stopBridgeServers(getCache()));
await().timeout(300, SECONDS)
.until(() -> fired[JOINED] || fired[CRASHED]);
System.out
.println("[testClientMembershipEventsInClient] assert client detected server departure");
assertFalse(fired[JOINED]);
assertNull(member[JOINED]);
assertNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(member[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertTrue(fired[CRASHED]);
assertNotNull(member[CRASHED]);
assertNotNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
// now test that we redisover the cache server
vm0.invoke("Recreate BridgeServer", () -> {
try {
System.out.println("[testClientMembershipEventsInClient] restarting BridgeServer");
startBridgeServer(ports[0]);
} catch (IOException e) {
getSystem().getLogWriter().fine(new Exception(e));
fail("Failed to start CacheServer on VM1: " + e.getMessage());
}
});
await().timeout(300, SECONDS)
.until(() -> fired[JOINED] || fired[CRASHED]);
System.out
.println("[testClientMembershipEventsInClient] assert client detected server recovery");
assertTrue(fired[JOINED]);
assertNotNull(member[JOINED]);
assertNotNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(member[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(member[CRASHED]);
assertNull(memberId[CRASHED]);
}
/**
* Tests notification of events in server process. cache servers detect client joins when the
* client connects to the server.
*/
@Test
public void testClientMembershipEventsInServer() throws Exception {
final boolean[] fired = new boolean[3];
final DistributedMember[] member = new DistributedMember[3];
final String[] memberId = new String[3];
final boolean[] isClient = new boolean[3];
// create and register ClientMembershipListener in controller vm...
ClientMembershipListener listener = new ClientMembershipListener() {
@Override
public void memberJoined(ClientMembershipEvent event) {
System.out.println("[testClientMembershipEventsInServer] memberJoined: " + event);
fired[JOINED] = true;
member[JOINED] = event.getMember();
memberId[JOINED] = event.getMemberId();
isClient[JOINED] = event.isClient();
assertFalse(fired[LEFT] || fired[CRASHED]);
}
@Override
public void memberLeft(ClientMembershipEvent event) {
System.out.println("[testClientMembershipEventsInServer] memberLeft: " + event);
fired[LEFT] = true;
member[LEFT] = event.getMember();
memberId[LEFT] = event.getMemberId();
isClient[LEFT] = event.isClient();
assertFalse(fired[JOINED] || fired[CRASHED]);
}
@Override
public void memberCrashed(ClientMembershipEvent event) {
System.out.println("[testClientMembershipEventsInServer] memberCrashed: " + event);
fired[CRASHED] = true;
member[CRASHED] = event.getMember();
memberId[CRASHED] = event.getMemberId();
isClient[CRASHED] = event.isClient();
assertFalse(fired[JOINED] || fired[LEFT]);
}
};
ClientMembership.registerClientMembershipListener(listener);
final VM vm0 = Host.getHost(0).getVM(0);
final String name = getUniqueName();
final int[] ports = new int[1];
// create BridgeServer in controller vm...
System.out.println("[testClientMembershipEventsInServer] Create BridgeServer");
getSystem();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
Region region = createRegion(name, factory.create());
assertNotNull(region);
assertNotNull(getRootRegion().getSubregion(name));
ports[0] = startBridgeServer(0);
assertTrue(ports[0] != 0);
DistributedMember serverMember = getMemberId();
String serverMemberId = serverMember.toString();
System.out.println("[testClientMembershipEventsInServer] ports[0]=" + ports[0]);
System.out.println("[testClientMembershipEventsInServer] serverMemberId=" + serverMemberId);
System.out.println("[testClientMembershipEventsInServer] serverMember=" + serverMember);
assertFalse(fired[JOINED]);
assertNull(member[JOINED]);
assertNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(member[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(member[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
// sanity check...
System.out.println("[testClientMembershipEventsInServer] sanity check");
DistributedMember test = new TestDistributedMember("test");
InternalClientMembership.notifyClientJoined(test);
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[JOINED] || fired[LEFT] || fired[CRASHED];
});
assertTrue(fired[JOINED]);
assertEquals(test, member[JOINED]);
assertEquals(test.getId(), memberId[JOINED]);
assertTrue(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(member[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(member[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
final Host host = Host.getHost(0);
SerializableCallable createConnectionPool = new SerializableCallable("Create connectionPool") {
@Override
public Object call() {
System.out.println("[testClientMembershipEventsInServer] create bridge client");
Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");
config.setProperty(ENABLE_NETWORK_PARTITION_DETECTION, "false");
properties = config;
DistributedSystem s = getSystem(config);
AttributesFactory factory = new AttributesFactory();
Pool pool = ClientServerTestCase.configureConnectionPool(factory,
NetworkUtils.getServerHostName(host), ports, true, -1, 2, null);
createRegion(name, factory.create());
assertNotNull(getRootRegion().getSubregion(name));
assertTrue(s == basicGetSystem()); // see geode-1078
return getMemberId();
}
};
// create bridge client in vm0...
DistributedMember clientMember = (DistributedMember) vm0.invoke(createConnectionPool);
String clientMemberId = clientMember.toString();
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[JOINED] || fired[LEFT] || fired[CRASHED];
});
System.out.println("[testClientMembershipEventsInServer] assert server detected client join");
assertTrue(fired[JOINED]);
assertEquals(member[JOINED] + " should equal " + clientMember, clientMember, member[JOINED]);
assertEquals(memberId[JOINED] + " should equal " + clientMemberId, clientMemberId,
memberId[JOINED]);
assertTrue(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(member[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(member[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
pauseForClientToJoin();
vm0.invoke(new SerializableRunnable("Stop bridge client") {
@Override
public void run() {
System.out.println("[testClientMembershipEventsInServer] Stop bridge client");
getRootRegion().getSubregion(name).close();
Map m = PoolManager.getAll();
for (final Object o : m.values()) {
Pool p = (Pool) o;
p.destroy();
}
}
});
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[JOINED] || fired[LEFT] || fired[CRASHED];
});
System.out.println("[testClientMembershipEventsInServer] assert server detected client left");
assertFalse(fired[JOINED]);
assertNull(member[JOINED]);
assertNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
assertTrue(fired[LEFT]);
assertEquals(clientMember, member[LEFT]);
assertEquals(clientMemberId, memberId[LEFT]);
assertTrue(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(member[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
// reconnect bridge client to test for crashed event
clientMemberId = vm0.invoke(createConnectionPool).toString();
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[JOINED] || fired[LEFT] || fired[CRASHED];
});
System.out
.println("[testClientMembershipEventsInServer] assert server detected client re-join");
assertTrue(fired[JOINED]);
assertEquals(clientMember, member[JOINED]);
assertEquals(clientMemberId, memberId[JOINED]);
assertTrue(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(member[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertFalse(fired[CRASHED]);
assertNull(member[CRASHED]);
assertNull(memberId[CRASHED]);
assertFalse(isClient[CRASHED]);
resetArraysForTesting(fired, member, memberId, isClient);
pauseForClientToJoin();
ServerConnection.setForceClientCrashEvent(true);
try {
vm0.invoke(new SerializableRunnable("Stop bridge client") {
@Override
public void run() {
System.out.println("[testClientMembershipEventsInServer] Stop bridge client");
getRootRegion().getSubregion(name).close();
Map m = PoolManager.getAll();
for (final Object o : m.values()) {
Pool p = (Pool) o;
p.destroy();
}
}
});
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[JOINED] || fired[LEFT] || fired[CRASHED];
});
System.out
.println("[testClientMembershipEventsInServer] assert server detected client crashed");
assertFalse(fired[JOINED]);
assertNull(member[JOINED]);
assertNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
assertFalse(fired[LEFT]);
assertNull(member[LEFT]);
assertNull(memberId[LEFT]);
assertFalse(isClient[LEFT]);
assertTrue(fired[CRASHED]);
assertEquals(clientMember, member[CRASHED]);
assertEquals(clientMemberId, memberId[CRASHED]);
assertTrue(isClient[CRASHED]);
} finally {
ServerConnection.setForceClientCrashEvent(false);
}
}
/**
* The joined event fires when the first client handshake is processed. This pauses long enough to
* allow the rest of the client sockets to complete handshaking before making the client leave.
* Without doing this subsequent socket handshakes that are processed could fire join events after
* departure events and then a departure event again. If you see failures in
* testClientMembershipEventsInServer, try increasing this timeout.
*/
private void pauseForClientToJoin() {
Wait.pause(2000);
}
/**
* Tests registration and event notification in conjunction with disconnecting and reconnecting to
* DistributedSystem.
*/
@Test
public void testLifecycle() throws Exception {
final boolean[] fired = new boolean[3];
final DistributedMember[] member = new DistributedMember[3];
final String[] memberId = new String[3];
final boolean[] isClient = new boolean[3];
// create and register ClientMembershipListener in controller vm...
ClientMembershipListener listener = new ClientMembershipListener() {
@Override
public void memberJoined(ClientMembershipEvent event) {
assertFalse(fired[JOINED]);
assertNull(member[JOINED]);
assertNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
fired[JOINED] = true;
member[JOINED] = event.getMember();
memberId[JOINED] = event.getMemberId();
isClient[JOINED] = event.isClient();
}
@Override
public void memberLeft(ClientMembershipEvent event) {}
@Override
public void memberCrashed(ClientMembershipEvent event) {}
};
ClientMembership.registerClientMembershipListener(listener);
// create loner in controller vm...
Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");
properties = config;
getSystem(config);
// assert that event is fired while connected
InternalClientMembership.notifyServerJoined(serverLocation);
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[JOINED];
});
assertTrue(fired[JOINED]);
assertNotNull(member[JOINED]);
assertFalse(isClient[JOINED]);
resetArraysForTesting(fired, member, memberId, isClient);
// assert that event is NOT fired while disconnected
disconnectFromDS();
InternalClientMembership.notifyServerJoined(serverLocation);
await().until(() -> {
return true;
});
assertFalse(fired[JOINED]);
assertNull(member[JOINED]);
assertNull(memberId[JOINED]);
assertFalse(isClient[JOINED]);
resetArraysForTesting(fired, member, memberId, isClient);
// assert that event is fired again after reconnecting
properties = config;
assertTrue(getSystem(config).isConnected());
InternalClientMembership.notifyServerJoined(serverLocation);
await().timeout(300, TimeUnit.SECONDS).until(() -> {
return fired[JOINED];
});
assertTrue(fired[JOINED]);
assertNotNull(member[JOINED]);
assertFalse(isClient[JOINED]);
}
/**
* Starts up server in controller vm and 4 clients, then calls and tests
* ClientMembership.getConnectedClients().
*/
@Test
public void testGetConnectedClients() throws Exception {
final String name = getUniqueName();
final int[] ports = new int[1];
IgnoredException.addIgnoredException("ConnectException");
// create BridgeServer in controller vm...
System.out.println("[testGetConnectedClients] Create BridgeServer");
getSystem();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
Region region = createRegion(name, factory.create());
assertNotNull(region);
assertNotNull(getRootRegion().getSubregion(name));
ports[0] = startBridgeServer(0);
assertTrue(ports[0] != 0);
String serverMemberId = getSystem().getDistributedMember().toString();
System.out.println("[testGetConnectedClients] ports[0]=" + ports[0]);
System.out.println("[testGetConnectedClients] serverMemberId=" + serverMemberId);
final Host host = Host.getHost(0);
SerializableCallable createPool = new SerializableCallable("Create connection pool") {
@Override
public Object call() {
System.out.println("[testGetConnectedClients] create bridge client");
properties = new Properties();
properties.setProperty(MCAST_PORT, "0");
properties.setProperty(LOCATORS, "");
getSystem(properties);
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
Pool p = ClientServerTestCase.configureConnectionPool(factory,
NetworkUtils.getServerHostName(host), ports, true, -1, -1, null);
createRegion(name, factory.create());
assertNotNull(getRootRegion().getSubregion(name));
assertTrue(p.getServers().size() > 0);
return getMemberId();
}
};
// create bridge client in vm0...
final String[] clientMemberIdArray = new String[host.getVMCount()];
for (int i = 0; i < host.getVMCount(); i++) {
final VM vm = Host.getHost(0).getVM(i);
System.out.println("creating pool in vm_" + i);
clientMemberIdArray[i] = vm.invoke(createPool).toString();
}
Collection clientMemberIds = Arrays.asList(clientMemberIdArray);
{
final int expectedClientCount = clientMemberIds.size();
await().timeout(300, TimeUnit.SECONDS).until(() -> {
Map connectedClients = InternalClientMembership.getConnectedClients(false, getCache());
if (connectedClients == null) {
return false;
}
return connectedClients.size() == expectedClientCount;
});
}
Map connectedClients = InternalClientMembership.getConnectedClients(false, getCache());
assertNotNull(connectedClients);
assertEquals(clientMemberIds.size(), connectedClients.size());
System.out
.println("connectedClients: " + connectedClients + "; clientMemberIds: " + clientMemberIds);
for (final Object o : connectedClients.keySet()) {
String connectedClient = (String) o;
System.out.println("[testGetConnectedClients] checking for client " + connectedClient);
assertTrue(clientMemberIds.contains(connectedClient));
Object[] result = (Object[]) connectedClients.get(connectedClient);
System.out.println("[testGetConnectedClients] result: "
+ (result == null ? "none" : result[0] + "; connections=" + result[1]));
}
}
/**
* Starts up 4 server and the controller vm as a client, then calls and tests
* ClientMembership.getConnectedServers().
*/
@Test
public void testGetConnectedServers() throws Exception {
final Host host = Host.getHost(0);
final String name = getUniqueName();
final int[] ports = new int[host.getVMCount()];
for (int i = 0; i < host.getVMCount(); i++) {
final int whichVM = i;
final VM vm = Host.getHost(0).getVM(i);
vm.invoke("Create cache server", () -> {
// create BridgeServer in controller vm...
System.out.println("[testGetConnectedServers] Create BridgeServer");
getSystem();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
Region region = createRegion(name + "_" + whichVM, factory.create());
assertNotNull(region);
assertNotNull(getRootRegion().getSubregion(name + "_" + whichVM));
region.put("KEY-1", "VAL-1");
try {
testGetConnectedServers_port = startBridgeServer(0);
} catch (IOException e) {
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.error("startBridgeServer threw IOException", e);
fail("startBridgeServer threw IOException " + e.getMessage());
}
assertTrue(testGetConnectedServers_port != 0);
System.out.println("[testGetConnectedServers] port=" + ports[whichVM]);
System.out.println(
"[testGetConnectedServers] serverMemberId=" + getSystem().getDistributedMember());
});
ports[whichVM] = vm.invoke("getTestGetConnectedServers_port",
ClientMembershipDUnitTest::getTestGetConnectedServers_port);
assertTrue(ports[whichVM] != 0);
}
System.out.println("[testGetConnectedServers] create bridge client");
Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");
properties = config;
getSystem(config);
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
for (int i = 0; i < ports.length; i++) {
System.out.println("[testGetConnectedServers] creating connectionpool for "
+ NetworkUtils.getServerHostName(host) + " " + ports[i]);
int[] thisServerPorts = new int[] {ports[i]};
configureConnectionPoolWithNameAndFactory(factory, NetworkUtils.getServerHostName(host),
thisServerPorts, false, -1,
-1, null, "pooly" + i, PoolManager.createFactory(), -1, -1, -2,
-1);
Region region = createRegion(name + "_" + i, factory.create());
assertNotNull(getRootRegion().getSubregion(name + "_" + i));
region.get("KEY-1");
}
final int expectedVMCount = host.getVMCount();
await().timeout(300, TimeUnit.SECONDS).until(() -> {
if (PoolManager.getAll().size() != expectedVMCount) {
return false;
}
Map connectedServers = InternalClientMembership.getConnectedServers();
if (connectedServers == null) {
return false;
}
return connectedServers.size() == expectedVMCount;
});
assertEquals(host.getVMCount(), PoolManager.getAll().size());
Map connectedServers = InternalClientMembership.getConnectedServers();
assertNotNull(connectedServers);
assertEquals(host.getVMCount(), connectedServers.size());
for (final Object o : connectedServers.keySet()) {
String connectedServer = (String) o;
System.out.println("[testGetConnectedServers] value for connectedServer: "
+ connectedServers.get(connectedServer));
}
}
protected static int testGetConnectedServers_port;
private static int getTestGetConnectedServers_port() {
return testGetConnectedServers_port;
}
@Override
public Properties getDistributedSystemProperties() {
if (properties == null) {
properties = new Properties();
properties.put(ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION, "false");
}
return properties;
}
/**
* Tests getConnectedClients(boolean onlyClientsNotifiedByThisServer) where
* onlyClientsNotifiedByThisServer is true.
*/
@Test
public void testGetNotifiedClients() throws Exception {
final Host host = Host.getHost(0);
final String name = getUniqueName();
final int[] ports = new int[host.getVMCount()];
for (int i = 0; i < host.getVMCount(); i++) {
final int whichVM = i;
final VM vm = Host.getHost(0).getVM(i);
vm.invoke(new CacheSerializableRunnable("Create cache server") {
@Override
public void run2() throws CacheException {
// create BridgeServer in controller vm...
System.out.println("[testGetNotifiedClients] Create BridgeServer");
getSystem();
AttributesFactory factory = new AttributesFactory();
Region region = createRegion(name, factory.create());
assertNotNull(region);
assertNotNull(getRootRegion().getSubregion(name));
region.put("KEY-1", "VAL-1");
try {
testGetNotifiedClients_port = startBridgeServer(0);
} catch (IOException e) {
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.error("startBridgeServer threw IOException", e);
fail("startBridgeServer threw IOException " + e.getMessage());
}
assertTrue(testGetNotifiedClients_port != 0);
System.out.println("[testGetNotifiedClients] port=" + ports[whichVM]);
System.out.println("[testGetNotifiedClients] serverMemberId=" + getMemberId());
}
});
ports[whichVM] = vm.invoke("getTestGetNotifiedClients_port",
ClientMembershipDUnitTest::getTestGetNotifiedClients_port);
assertTrue(ports[whichVM] != 0);
}
System.out.println("[testGetNotifiedClients] create bridge client");
Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");
properties = config;
getSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
System.out.println("[testGetNotifiedClients] creating connection pool");
ClientServerTestCase.configureConnectionPool(factory, NetworkUtils.getServerHostName(host),
ports, true, -1, -1, null);
Region region = createRegion(name, factory.create());
assertNotNull(getRootRegion().getSubregion(name));
region.registerInterest("KEY-1");
region.get("KEY-1");
final String clientMemberId = getMemberId().toString();
pauseForClientToJoin();
// assertions go here
int[] clientCounts = new int[host.getVMCount()];
// only one server vm will have that client for updating
for (int i = 0; i < host.getVMCount(); i++) {
final int whichVM = i;
final VM vm = Host.getHost(0).getVM(i);
vm.invoke("Create cache server", () -> {
Map clients = InternalClientMembership.getConnectedClients(true, getCache());
assertNotNull(clients);
testGetNotifiedClients_clientCount = clients.size();
// [bruce] this is not a valid assertion - the server may not use
// fully qualified host names while clients always use them in
// forming their member ID. The test needs to check InetAddresses,
// not strings
// if (testGetNotifiedClients_clientCount > 0) {
// // assert that the clientMemberId matches
// assertEquals(clientMemberId, clients.keySet().iterator().next());
// }
});
clientCounts[whichVM] = vm.invoke("getTestGetNotifiedClients_clientCount",
ClientMembershipDUnitTest::getTestGetNotifiedClients_clientCount);
}
// only one server should have a notifier for this client...
int totalClientCounts = 0;
for (final int clientCount : clientCounts) {
totalClientCounts += clientCount;
}
// this assertion fails because the count is 4
// assertIndexDetailsEquals(1, totalClientCounts);
}
protected static int testGetNotifiedClients_port;
private static int getTestGetNotifiedClients_port() {
return testGetNotifiedClients_port;
}
protected static int testGetNotifiedClients_clientCount;
private static int getTestGetNotifiedClients_clientCount() {
return testGetNotifiedClients_clientCount;
}
// Simple DistributedMember implementation
static class TestDistributedMember implements DistributedMember {
private final String host;
public TestDistributedMember(String host) {
this.host = host;
}
@Override
public String getName() {
return "";
}
@Override
public String getHost() {
return host;
}
@Override
public Set getRoles() {
return new HashSet();
}
@Override
public int getProcessId() {
return 0;
}
@Override
public String getId() {
return host;
}
@Override
public String getUniqueId() {
return host;
}
@Override
public int compareTo(DistributedMember o) {
if ((o == null) || !(o instanceof TestDistributedMember)) {
throw new InternalGemFireException("Invalidly comparing TestDistributedMember to " + o);
}
TestDistributedMember tds = (TestDistributedMember) o;
return getHost().compareTo(tds.getHost());
}
public boolean equals(Object obj) {
if ((obj == null) || !(obj instanceof TestDistributedMember)) {
return false;
}
return compareTo((TestDistributedMember) obj) == 0;
}
public int hashCode() {
return getHost().hashCode();
}
@Override
public DurableClientAttributes getDurableClientAttributes() {
return null;
}
@Override
public List<String> getGroups() {
return Collections.emptyList();
}
}
}