blob: dffc2160af6cd0d98c441f4470af835c131e8e90 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.client.internal;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.locator.ClientConnectionRequest;
import com.gemstone.gemfire.cache.client.internal.locator.ClientConnectionResponse;
import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionRequest;
import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionResponse;
import com.gemstone.gemfire.cache.server.ServerLoad;
import com.gemstone.gemfire.cache.server.ServerLoadProbeAdapter;
import com.gemstone.gemfire.cache.server.ServerMetrics;
import com.gemstone.gemfire.cache.util.BridgeServer;
import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.InternalLocator;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.distributed.internal.ServerLocator;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.BridgeServerImpl;
import com.gemstone.gemfire.internal.cache.PoolFactoryImpl;
import com.gemstone.gemfire.internal.logging.InternalLogWriter;
import com.gemstone.gemfire.internal.logging.LocalLogWriter;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* @author dsmith
*
*/
public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
/**
* The number of connections that we can be off by in the balancing tests
* We need this little fudge factor, because the locator can receive an update
* from the bridge server after it has made incremented its counter for a client
* connection, but the client hasn't connected yet. This wipes out the estimation
* on the locator. This means that we may be slighly off in our balance.
*
* TODO grid fix this hole in the locator.
*/
private static final int ALLOWABLE_ERROR_IN_COUNT = 1;
protected static final long MAX_WAIT = 60000;
public LocatorLoadBalancingDUnitTest(String name) {
super(name);
}
/**
* Test the locator discovers a bridge server and is initialized with
* the correct load for that bridge server.
*/
public void testDiscovery() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
// vm0.invoke(new SerializableRunnable() {
// public void run() {
// System.setProperty("gemfire.DistributionAdvisor.VERBOSE", "true");
// }
// });
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
startLocatorInVM(vm0, locatorPort, "");
String locators = getLocatorString(host, locatorPort);
int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
ServerLocation expectedLocation = new ServerLocation(getServerHostName(vm0
.getHost()), serverPort);
Map expected = new HashMap();
expected.put(expectedLocation, expectedLoad);
checkLocatorLoad(vm0, expected);
int serverPort2 = startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators);
ServerLocation expectedLocation2 = new ServerLocation(getServerHostName(vm0
.getHost()), serverPort2);
expected.put(expectedLocation2, expectedLoad);
checkLocatorLoad(vm0, expected);
}
/**
* Test that the locator will properly estimate the load for servers when
* it receives connection requests.
*/
public void testEstimation() throws UnknownHostException, IOException, ClassNotFoundException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
startLocatorInVM(vm0, locatorPort, "");
String locators = getLocatorString(host, locatorPort);
int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
ServerLoad expectedLoad = new ServerLoad(2/800f, 1 / 800.0f, 0f, 1f);
ServerLocation expectedLocation = new ServerLocation(getServerHostName(host), serverPort);
Map expected = new HashMap();
expected.put(expectedLocation, expectedLoad);
ClientConnectionResponse response;
response = (ClientConnectionResponse) TcpClient.requestToServer(InetAddress
.getByName(getServerHostName(host)), locatorPort,
new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000);
Assert.assertEquals(expectedLocation, response.getServer());
response = (ClientConnectionResponse) TcpClient.requestToServer(InetAddress
.getByName(getServerHostName(host)), locatorPort,
new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000, true);
Assert.assertEquals(expectedLocation, response.getServer());
//we expect that the connection load load will be 2 * the loadPerConnection
checkLocatorLoad(vm0, expected);
QueueConnectionResponse response2;
response2 = (QueueConnectionResponse) TcpClient.requestToServer(InetAddress
.getByName(getServerHostName(host)), locatorPort,
new QueueConnectionRequest(null, 2,
Collections.EMPTY_SET, null, false), 10000, true);
Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers());
response2 = (QueueConnectionResponse) TcpClient
.requestToServer(InetAddress.getByName(getServerHostName(host)),
locatorPort, new QueueConnectionRequest(null, 5, Collections.EMPTY_SET, null,
false), 10000, true);
Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers());
//we expect that the queue load will increase by 2
expectedLoad.setSubscriptionConnectionLoad(2f);
checkLocatorLoad(vm0, expected);
}
/**
* Test to make sure the bridge servers communicate
* their updated load to the controller when the load
* on the bridge server changes.
*/
public void testLoadMessaging() {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
startLocatorInVM(vm0, locatorPort, "");
String locators = getLocatorString(host, locatorPort);
final int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
//We expect 0 load
Map expected = new HashMap();
ServerLocation expectedLocation = new ServerLocation(getServerHostName(host), serverPort);
ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
expected.put(expectedLocation, expectedLoad);
checkLocatorLoad(vm0, expected);
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addServer(getServerHostName(host), serverPort);
pf.setMinConnections(8);
pf.setMaxConnections(8);
pf.setSubscriptionEnabled(true);
startBridgeClientInVM(vm2, pf.getPoolAttributes(), new String[] {REGION_NAME});
//We expect 8 client to server connections. The queue requires
//an additional client to server connection, but that shouldn't show up here.
expectedLoad = new ServerLoad(8/800f, 1 / 800.0f, 1f, 1f);
expected.put(expectedLocation, expectedLoad);
checkLocatorLoad(vm0, expected);
stopBridgeMemberVM(vm2);
//Now we expect 0 load
expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
expected.put(expectedLocation, expectedLoad);
checkLocatorLoad(vm0, expected);
}
/**
* Test to make sure that the locator
* balancing load between two servers.
*/
public void testBalancing() {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
startLocatorInVM(vm0, locatorPort, "");
String locators = getLocatorString(host, locatorPort);
startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators);
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(getServerHostName(host), locatorPort);
pf.setMinConnections(80);
pf.setMaxConnections(80);
pf.setSubscriptionEnabled(false);
pf.setIdleTimeout(-1);
startBridgeClientInVM(vm3, pf.getPoolAttributes(), new String[] {REGION_NAME});
waitForPrefilledConnections(vm3, 80);
checkConnectionCount(vm1, 40);
checkConnectionCount(vm2, 40);
}
private void checkConnectionCount(VM vm, final int count) {
Runnable checkConnectionCount = new SerializableRunnable("checkConnectionCount") {
public void run() {
Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
final BridgeServerImpl server = (BridgeServerImpl)
cache.getBridgeServers().get(0);
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
int sz = server.getAcceptor().getStats()
.getCurrentClientConnections();
if (Math.abs(sz - count) <= ALLOWABLE_ERROR_IN_COUNT) {
return true;
}
excuse = "Found " + sz + " connections, expected " + count;
return false;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 5 * 60 * 1000, 1000, true);
}
};
vm.invoke(checkConnectionCount);
}
private void waitForPrefilledConnections(VM vm, final int count) {
waitForPrefilledConnections(vm, count, POOL_NAME);
}
private void waitForPrefilledConnections(VM vm, final int count, final String poolName) {
SerializableRunnable runnable = new SerializableRunnable("waitForPrefilledConnections") {
public void run() {
final PoolImpl pool = (PoolImpl) PoolManager.getAll().get(poolName);
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return pool.getConnectionCount() >= count;
}
public String description() {
return "connection count never reached " + count;
}
};
DistributedTestCase.waitForCriterion(ev, MAX_WAIT, 200, true);
}
};
if(vm == null) {
runnable.run();
} else {
vm.invoke(runnable);
}
}
/** Test that the locator balances load between
* three servers with intersecting server groups.
* Server: 1 2 3
* Groups: a a,b b
*/
public void testIntersectingServerGroups() {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
startLocatorInVM(vm0, locatorPort, "");
String locators = getLocatorString(host, locatorPort);
int serverPort1 = startBridgeServerInVM(vm1, new String[] {"a"}, locators);
startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators);
startBridgeServerInVM(vm3, new String[] {"b"}, locators);
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(getServerHostName(host), locatorPort);
pf.setMinConnections(12);
pf.setSubscriptionEnabled(false);
pf.setServerGroup("a");
pf.setIdleTimeout(-1);
startBridgeClientInVM(null, pf.getPoolAttributes(), new String[] {REGION_NAME});
waitForPrefilledConnections(null, 12);
checkConnectionCount(vm1, 6);
checkConnectionCount(vm2, 6);
checkConnectionCount(vm3, 0);
getLogWriter().info("pool1 prefilled");
PoolFactoryImpl pf2 = (PoolFactoryImpl) PoolManager.createFactory();
pf2.init(pf.getPoolAttributes());
pf2.setServerGroup("b");
PoolImpl pool2= (PoolImpl) pf2.create("testPool2");
waitForPrefilledConnections(null, 12, "testPool2");
// The load will not be perfect, because we created all of the connections
//for group A first.
checkConnectionCount(vm1, 6);
checkConnectionCount(vm2, 9);
checkConnectionCount(vm3, 9);
getLogWriter().info("pool2 prefilled");
ServerLocation location1 = new ServerLocation(getServerHostName(host), serverPort1);
PoolImpl pool1 = (PoolImpl) PoolManager.getAll().get(POOL_NAME);
Assert.assertEquals("a", pool1.getServerGroup());
//Use up all of the pooled connections on pool1, and acquire 3 more
for(int i = 0; i < 15; i++) {
pool1.acquireConnection();
}
getLogWriter().info("aquired 15 connections in pool1");
//now the load should be equal
checkConnectionCount(vm1, 9);
checkConnectionCount(vm2, 9);
checkConnectionCount(vm3, 9);
//use up all of the pooled connections on pool2
for(int i = 0; i < 12; i++) {
pool2.acquireConnection();
}
getLogWriter().info("aquired 12 connections in pool2");
//interleave creating connections in both pools
for(int i = 0; i < 6; i++) {
pool1.acquireConnection();
pool2.acquireConnection();
}
getLogWriter().info("interleaved 6 connections from pool1 with 6 connections from pool2");
//The load should still be balanced
checkConnectionCount(vm1, 13);
checkConnectionCount(vm2, 13);
checkConnectionCount(vm3, 13);
}
public void testCustomLoadProbe() {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
// VM vm3 = host.getVM(3);
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
startLocatorInVM(vm0, locatorPort, "");
String locators = getLocatorString(host, locatorPort);
ServerLoad load1= new ServerLoad(.3f, .01f, .44f, 4564f);
ServerLoad load2= new ServerLoad(23.2f, 1.1f, 22.3f, .3f);
int serverPort1 = startBridgeServerInVM(vm1, null, locators, new String[] {REGION_NAME}, new MyLoadProbe(load1 ));
int serverPort2 = startBridgeServerInVM(vm2, null, locators, new String[] {REGION_NAME}, new MyLoadProbe(load2 ));
HashMap expected = new HashMap();
ServerLocation l1 = new ServerLocation(getServerHostName(host), serverPort1);
ServerLocation l2 = new ServerLocation(getServerHostName(host), serverPort2);
expected.put(l1, load1);
expected.put(l2, load2);
checkLocatorLoad(vm0, expected);
load1.setConnectionLoad(25f);
changeLoad(vm1, load1);
load2.setSubscriptionConnectionLoad(3.5f);
changeLoad(vm2, load2);
checkLocatorLoad(vm0, expected);
load1 = new ServerLoad(1f, .1f, 0f, 1f);
load2 = new ServerLoad(2f, 5f, 0f, 2f);
expected.put(l1, load1);
expected.put(l2, load2);
changeLoad(vm1, load1);
changeLoad(vm2, load2);
checkLocatorLoad(vm0, expected);
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(getServerHostName(host), locatorPort);
pf.setMinConnections(20);
pf.setSubscriptionEnabled(true);
pf.setIdleTimeout(-1);
startBridgeClientInVM(null, pf.getPoolAttributes(), new String[] {REGION_NAME});
waitForPrefilledConnections(null, 20);
//The first 10 connection should to go vm1, then 1 to vm2, then another 9 to vm1
//because have unequal values for loadPerConnection
checkConnectionCount(vm1, 19);
checkConnectionCount(vm2, 1);
}
public void checkLocatorLoad(VM vm, final Map expected) {
vm.invoke(new SerializableRunnable() {
public void run() {
List locators = Locator.getLocators();
Assert.assertEquals(1, locators.size());
InternalLocator locator = (InternalLocator) locators.get(0);
final ServerLocator sl = locator.getServerLocatorAdvisee();
InternalLogWriter log = new LocalLogWriter(InternalLogWriter.FINEST_LEVEL, System.out);
sl.getDistributionAdvisor().dumpProfiles("PROFILES= ");
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return expected.equals(sl.getLoadMap());
}
public String description() {
return "load map never became equal to " + expected;
}
};
DistributedTestCase.waitForCriterion(ev, MAX_WAIT, 200, true);
}
});
}
private void changeLoad(VM vm, final ServerLoad newLoad) {
vm.invoke(new SerializableRunnable() {
public void run() {
Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
BridgeServer server = (BridgeServer) cache.getBridgeServers().get(0);
MyLoadProbe probe = (MyLoadProbe) server.getLoadProbe();
probe.setLoad(newLoad);
}
});
}
private static class MyLoadProbe extends ServerLoadProbeAdapter implements Serializable {
private ServerLoad load;
public MyLoadProbe(ServerLoad load) {
this.load = load;
}
public ServerLoad getLoad(ServerMetrics metrics) {
float connectionLoad = load.getConnectionLoad()
+ metrics.getConnectionCount() * load.getLoadPerConnection();
float queueLoad = load.getSubscriptionConnectionLoad() + metrics.getSubscriptionConnectionCount()
* load.getLoadPerSubscriptionConnection();
return new ServerLoad(connectionLoad, load.getLoadPerConnection(),
queueLoad, load.getLoadPerSubscriptionConnection());
}
public void setLoad(ServerLoad load) {
this.load = load;
}
}
}