blob: 6cc631059e0d0977ecb8f53f714d3737a39996f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.locator.ClientConnectionRequest;
import org.apache.geode.cache.client.internal.locator.ClientConnectionResponse;
import org.apache.geode.cache.client.internal.locator.QueueConnectionRequest;
import org.apache.geode.cache.client.internal.locator.QueueConnectionResponse;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.server.ServerLoad;
import org.apache.geode.cache.server.ServerLoadProbeAdapter;
import org.apache.geode.cache.server.ServerMetrics;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.ServerLocator;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.PoolFactoryImpl;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.ClientServerTest;
@Category({ClientServerTest.class})
public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
/**
* The number of connections that we can be off by in the balancing tests We need this little
* fudge factor, because the locator can receive an update from the cache server after it has
* made incremented its counter for a client connection, but the client hasn't connected yet. This
* wipes out the estimation on the locator. This means that we may be slighly off in our balance.
* <p>
* TODO grid fix this hole in the locator.
*/
private static final int ALLOWABLE_ERROR_IN_COUNT = 1;
protected static final long MAX_WAIT = 60000;
public LocatorLoadBalancingDUnitTest() {
super();
}
/**
* Test the locator discovers a cache server and is initialized with the correct load for that
* cache server.
*/
@Test
public void testDiscovery() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
String hostName = NetworkUtils.getServerHostName();
int locatorPort = vm0.invoke("Start Locator", () -> startLocator(hostName, ""));
String locators = getLocatorString(hostName, locatorPort);
int serverPort = vm1.invoke("Start BridgeServer",
() -> startBridgeServer(new String[] {"a", "b"}, locators));
ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
ServerLocation expectedLocation = new ServerLocation(hostName, serverPort);
Map expected = new HashMap();
expected.put(expectedLocation, expectedLoad);
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
int serverPort2 = vm2.invoke("Start BridgeServer",
() -> startBridgeServer(new String[] {"a", "b"}, locators));
ServerLocation expectedLocation2 = new ServerLocation(hostName, serverPort2);
expected.put(expectedLocation2, expectedLoad);
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
}
/**
* Test that the locator will properly estimate the load for servers when it receives connection
* requests.
*/
@Test
public void testEstimation() throws IOException, ClassNotFoundException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
String hostName = NetworkUtils.getServerHostName();
int locatorPort = vm0.invoke("Start Locator", () -> startLocator(hostName, ""));
String locators = getLocatorString(hostName, locatorPort);
int serverPort = vm1.invoke("Start BridgeServer",
() -> startBridgeServer(new String[] {"a", "b"}, locators));
ServerLoad expectedLoad = new ServerLoad(2 / 800f, 1 / 800.0f, 0f, 1f);
ServerLocation expectedLocation = new ServerLocation(hostName, serverPort);
Map expected = new HashMap();
expected.put(expectedLocation, expectedLoad);
SocketCreatorFactory.setDistributionConfig(new DistributionConfigImpl(new Properties()));
ClientConnectionResponse response;
response =
(ClientConnectionResponse) new TcpClient().requestToServer(InetAddress.getByName(hostName),
locatorPort, new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000);
Assert.assertEquals(expectedLocation, response.getServer());
response =
(ClientConnectionResponse) new TcpClient().requestToServer(InetAddress.getByName(hostName),
locatorPort, new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000, true);
Assert.assertEquals(expectedLocation, response.getServer());
// we expect that the connection load load will be 2 * the loadPerConnection
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
QueueConnectionResponse response2;
response2 =
(QueueConnectionResponse) new TcpClient().requestToServer(InetAddress.getByName(hostName),
locatorPort, new QueueConnectionRequest(null, 2, Collections.EMPTY_SET, null, false),
10000, true);
Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers());
response2 =
(QueueConnectionResponse) new TcpClient().requestToServer(InetAddress.getByName(hostName),
locatorPort, new QueueConnectionRequest(null, 5, Collections.EMPTY_SET, null, false),
10000, true);
Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers());
// we expect that the queue load will increase by 2
expectedLoad.setSubscriptionConnectionLoad(2f);
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
}
/**
* Test to make sure the cache servers communicate their updated load to the controller when the
* load on the cache server changes.
*
*/
@Test
public void testLoadMessaging() throws Exception {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
String hostName = NetworkUtils.getServerHostName();
int locatorPort = vm0.invoke("Start Locator", () -> startLocator(hostName, ""));
String locators = getLocatorString(hostName, locatorPort);
final int serverPort = vm1.invoke("Start BridgeServer",
() -> startBridgeServer(new String[] {"a", "b"}, locators));
// We expect 0 load
Map expected = new HashMap();
ServerLocation expectedLocation = new ServerLocation(hostName, serverPort);
ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
expected.put(expectedLocation, expectedLoad);
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
vm2.invoke("StartBridgeClient", () -> {
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addServer(NetworkUtils.getServerHostName(host), serverPort);
pf.setMinConnections(8);
pf.setMaxConnections(8);
pf.setSubscriptionEnabled(true);
startBridgeClient(pf.getPoolAttributes(), new String[] {REGION_NAME});
return null;
});
// We expect 8 client to server connections. The queue requires
// an additional client to server connection, but that shouldn't show up here.
expectedLoad = new ServerLoad(8 / 800f, 1 / 800.0f, 1f, 1f);
expected.put(expectedLocation, expectedLoad);
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
stopBridgeMemberVM(vm2);
// Now we expect 0 load
expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
expected.put(expectedLocation, expectedLoad);
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
}
/**
* Test to make sure that the locator balancing load between two servers.
*
*/
@Test
public void testBalancing() throws Exception {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
String hostName = NetworkUtils.getServerHostName();
int locatorPort = vm0.invoke("Start Locator", () -> startLocator(hostName, ""));
String locators = getLocatorString(hostName, locatorPort);
vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"a", "b"}, locators));
vm2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"a", "b"}, locators));
vm3.invoke("StartBridgeClient", () -> {
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(hostName, locatorPort);
pf.setMinConnections(80);
pf.setMaxConnections(80);
pf.setSubscriptionEnabled(false);
pf.setIdleTimeout(-1);
startBridgeClient(pf.getPoolAttributes(), new String[] {REGION_NAME});
return null;
});
vm3.invoke("waitForPrefilledConnections", () -> waitForPrefilledConnections(80));
vm1.invoke("check connection count", () -> checkConnectionCount(40));
vm2.invoke("check connection count", () -> checkConnectionCount(40));
}
private void checkConnectionCount(final int count) {
Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
final CacheServerImpl server = (CacheServerImpl) cache.getCacheServers().get(0);
await().timeout(300, TimeUnit.SECONDS).until(() -> {
int sz = server.getAcceptor().getStats().getCurrentClientConnections();
if (Math.abs(sz - count) <= ALLOWABLE_ERROR_IN_COUNT) {
return true;
}
System.out.println("Found " + sz + " connections, expected " + count);
return false;
});
}
private void waitForPrefilledConnections(final int count) throws Exception {
waitForPrefilledConnections(count, POOL_NAME);
}
private void waitForPrefilledConnections(final int count, final String poolName)
throws Exception {
final PoolImpl pool = (PoolImpl) PoolManager.getAll().get(poolName);
await().timeout(300, TimeUnit.SECONDS)
.until(() -> pool.getConnectionCount() >= count);
}
/**
* Test that the locator balances load between three servers with intersecting server groups.
* Server: 1 2 3 Groups: a a,b b
*
*/
@Test
public void testIntersectingServerGroups() throws Exception {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
String hostName = NetworkUtils.getServerHostName();
int locatorPort = vm0.invoke("Start Locator", () -> startLocator(hostName, ""));
String locators = getLocatorString(hostName, locatorPort);
int serverPort1 =
vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"a"}, locators));
vm2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"a", "b"}, locators));
vm3.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"b"}, locators));
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort);
pf.setMinConnections(12);
pf.setSubscriptionEnabled(false);
pf.setServerGroup("a");
pf.setIdleTimeout(-1);
startBridgeClient(pf.getPoolAttributes(), new String[] {REGION_NAME});
waitForPrefilledConnections(12);
vm1.invoke("Check Connection Count", () -> checkConnectionCount(6));
vm2.invoke("Check Connection Count", () -> checkConnectionCount(6));
vm3.invoke("Check Connection Count", () -> checkConnectionCount(0));
LogWriterUtils.getLogWriter().info("pool1 prefilled");
PoolFactoryImpl pf2 = (PoolFactoryImpl) PoolManager.createFactory();
pf2.init(pf.getPoolAttributes());
pf2.setServerGroup("b");
PoolImpl pool2 = (PoolImpl) pf2.create("testPool2");
waitForPrefilledConnections(12, "testPool2");
// The load will not be perfect, because we created all of the connections
// for group A first.
vm1.invoke("Check Connection Count", () -> checkConnectionCount(6));
vm2.invoke("Check Connection Count", () -> checkConnectionCount(9));
vm3.invoke("Check Connection Count", () -> checkConnectionCount(9));
LogWriterUtils.getLogWriter().info("pool2 prefilled");
ServerLocation location1 =
new ServerLocation(NetworkUtils.getServerHostName(host), serverPort1);
PoolImpl pool1 = (PoolImpl) PoolManager.getAll().get(POOL_NAME);
Assert.assertEquals("a", pool1.getServerGroup());
// Use up all of the pooled connections on pool1, and acquire 3 more
for (int i = 0; i < 15; i++) {
pool1.acquireConnection();
}
LogWriterUtils.getLogWriter().info("aquired 15 connections in pool1");
// now the load should be equal
vm1.invoke("Check Connection Count", () -> checkConnectionCount(9));
vm2.invoke("Check Connection Count", () -> checkConnectionCount(9));
vm3.invoke("Check Connection Count", () -> checkConnectionCount(9));
// use up all of the pooled connections on pool2
for (int i = 0; i < 12; i++) {
pool2.acquireConnection();
}
LogWriterUtils.getLogWriter().info("aquired 12 connections in pool2");
// interleave creating connections in both pools
for (int i = 0; i < 6; i++) {
pool1.acquireConnection();
pool2.acquireConnection();
}
LogWriterUtils.getLogWriter()
.info("interleaved 6 connections from pool1 with 6 connections from pool2");
// The load should still be balanced
vm1.invoke("Check Connection Count", () -> checkConnectionCount(13));
vm2.invoke("Check Connection Count", () -> checkConnectionCount(13));
vm3.invoke("Check Connection Count", () -> checkConnectionCount(13));
}
@Test
public void testCustomLoadProbe() throws Exception {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
// VM vm3 = host.getVM(3);
String hostName = NetworkUtils.getServerHostName(vm0.getHost());
int locatorPort = vm0.invoke("Start Locator", () -> startLocator(hostName, ""));
String locators = getLocatorString(hostName, locatorPort);
final ServerLoad load1 = new ServerLoad(.3f, .01f, .44f, 4564f);
final ServerLoad load2 = new ServerLoad(23.2f, 1.1f, 22.3f, .3f);
int serverPort1 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(null, locators,
new String[] {REGION_NAME}, new MyLoadProbe(load1), false));
int serverPort2 = vm2.invoke("Start BridgeServer", () -> startBridgeServer(null, locators,
new String[] {REGION_NAME}, new MyLoadProbe(load2), false));
HashMap expected = new HashMap();
ServerLocation l1 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort1);
ServerLocation l2 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort2);
expected.put(l1, load1);
expected.put(l2, load2);
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
load1.setConnectionLoad(25f);
vm1.invoke("changeLoad", () -> changeLoad(load1));
load2.setSubscriptionConnectionLoad(3.5f);
vm2.invoke("changeLoad", () -> changeLoad(load2));
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
final ServerLoad load1Updated = new ServerLoad(1f, .1f, 0f, 1f);
final ServerLoad load2Updated = new ServerLoad(2f, 5f, 0f, 2f);
expected.put(l1, load1Updated);
expected.put(l2, load2Updated);
vm1.invoke("changeLoad", () -> changeLoad(load1Updated));
vm2.invoke("changeLoad", () -> changeLoad(load2Updated));
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort);
pf.setMinConnections(20);
pf.setSubscriptionEnabled(true);
pf.setIdleTimeout(-1);
startBridgeClient(pf.getPoolAttributes(), new String[] {REGION_NAME});
waitForPrefilledConnections(20);
// The first 10 connection should to go vm1, then 1 to vm2, then another 9 to vm1
// because have unequal values for loadPerConnection
vm1.invoke("Check Connection Count", () -> checkConnectionCount(19));
vm2.invoke("Check Connection Count", () -> checkConnectionCount(1));
}
public void checkLocatorLoad(final Map expected) {
List locators = Locator.getLocators();
Assert.assertEquals(1, locators.size());
InternalLocator locator = (InternalLocator) locators.get(0);
final ServerLocator sl = locator.getServerLocatorAdvisee();
sl.getDistributionAdvisor().dumpProfiles("PROFILES= ");
await().timeout(300, TimeUnit.SECONDS)
.until(() -> expected.equals(sl.getLoadMap()));
}
private void changeLoad(final ServerLoad newLoad) {
Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
CacheServer server = cache.getCacheServers().get(0);
MyLoadProbe probe = (MyLoadProbe) server.getLoadProbe();
probe.setLoad(newLoad);
}
private static class MyLoadProbe extends ServerLoadProbeAdapter implements Serializable {
private ServerLoad load;
public MyLoadProbe(ServerLoad load) {
this.load = load;
}
@Override
public ServerLoad getLoad(ServerMetrics metrics) {
float connectionLoad =
load.getConnectionLoad() + metrics.getConnectionCount() * load.getLoadPerConnection();
float queueLoad = load.getSubscriptionConnectionLoad()
+ metrics.getSubscriptionConnectionCount() * load.getLoadPerSubscriptionConnection();
return new ServerLoad(connectionLoad, load.getLoadPerConnection(), queueLoad,
load.getLoadPerSubscriptionConnection());
}
public void setLoad(ServerLoad load) {
this.load = load;
}
}
}