blob: c3d60a573ffe368c310e77e6786c5ba9d690794f [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.bookkeeper.test.PortManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.api.Authentication;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;
import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble;
public class ModularLoadManagerImplTest {
private LocalBookkeeperEnsemble bkEnsemble;
private URL url1;
private PulsarService pulsar1;
private PulsarAdmin admin1;
private URL url2;
private PulsarService pulsar2;
private PulsarAdmin admin2;
private String primaryHost;
private String secondaryHost;
private NamespaceBundleFactory nsFactory;
private ModularLoadManagerImpl primaryLoadManager;
private ModularLoadManagerImpl secondaryLoadManager;
private ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
private final int PRIMARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
private final int SECONDARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
private final int PRIMARY_BROKER_PORT = PortManager.nextFreePort();
private final int SECONDARY_BROKER_PORT = PortManager.nextFreePort();
private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImplTest.class);
static {
System.setProperty("test.basePort", "16100");
}
// Invoke non-overloaded method.
private Object invokeSimpleMethod(final Object instance, final String methodName, final Object... args)
throws Exception {
for (Method method : instance.getClass().getDeclaredMethods()) {
if (method.getName().equals(methodName)) {
method.setAccessible(true);
return method.invoke(instance, args);
}
}
throw new IllegalArgumentException("Method not found: " + methodName);
}
private static Object getField(final Object instance, final String fieldName) throws Exception {
final Field field = instance.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(instance);
}
private static void setField(final Object instance, final String fieldName, final Object value) throws Exception {
final Field field = instance.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(instance, value);
}
@BeforeMethod
void setup() throws Exception {
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble.start();
// Start broker 1
ServiceConfiguration config1 = new ServiceConfiguration();
config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config1.setClusterName("use");
config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT);
config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config1.setBrokerServicePort(PRIMARY_BROKER_PORT);
pulsar1 = new PulsarService(config1);
pulsar1.start();
primaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(), PRIMARY_BROKER_WEBSERVICE_PORT);
url1 = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT);
admin1 = new PulsarAdmin(url1, (Authentication) null);
// Start broker 2
ServiceConfiguration config2 = new ServiceConfiguration();
config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config2.setClusterName("use");
config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT);
config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config2.setBrokerServicePort(SECONDARY_BROKER_PORT);
pulsar2 = new PulsarService(config2);
secondaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(),
SECONDARY_BROKER_WEBSERVICE_PORT);
pulsar2.start();
url2 = new URL("http://127.0.0.1" + ":" + SECONDARY_BROKER_WEBSERVICE_PORT);
admin2 = new PulsarAdmin(url2, (Authentication) null);
primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager");
secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager");
nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32());
Thread.sleep(100);
}
@AfterMethod
void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdown();
admin1.close();
admin2.close();
pulsar2.close();
pulsar1.close();
bkEnsemble.stop();
}
private NamespaceBundle makeBundle(final String property, final String cluster, final String namespace) {
return nsFactory.getBundle(new NamespaceName(property, cluster, namespace),
Range.range(NamespaceBundles.FULL_LOWER_BOUND, BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND,
BoundType.CLOSED));
}
private NamespaceBundle makeBundle(final String all) {
return makeBundle(all, all, all);
}
@Test
public void testCandidateConsistency() throws Exception {
boolean foundFirst = false;
boolean foundSecond = false;
// After 2 selections, the load balancer should select both brokers due to preallocation.
for (int i = 0; i < 2; ++i) {
final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
final String broker = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
if (broker.equals(primaryHost)) {
foundFirst = true;
} else {
foundSecond = true;
}
}
assert (foundFirst && foundSecond);
// Now disable the secondary broker.
secondaryLoadManager.disableBroker();
LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");
// Give some time for the watch to fire.
Thread.sleep(500);
// Make sure the second broker is not in the internal map.
assert (!loadData.getBrokerData().containsKey(secondaryHost));
// Try 5 more selections, ensure they all go to the first broker.
for (int i = 2; i < 7; ++i) {
final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
assert (primaryLoadManager.selectBrokerForAssignment(serviceUnit).equals(primaryHost));
}
}
// Test that ModularLoadManagerImpl will determine that writing local data to ZooKeeper is necessary if certain
// metrics change by a percentage threshold.
@Test
public void testNeedBrokerDataUpdate() throws Exception {
final LocalBrokerData lastData = new LocalBrokerData();
final LocalBrokerData currentData = new LocalBrokerData();
final ServiceConfiguration conf = pulsar1.getConfiguration();
// Set this manually in case the default changes.
conf.setLoadBalancerReportUpdateThresholdPercentage(5);
// Easier to test using an uninitialized ModularLoadManagerImpl.
final ModularLoadManagerImpl loadManager = new ModularLoadManagerImpl();
setField(loadManager, "lastData", lastData);
setField(loadManager, "localData", currentData);
setField(loadManager, "conf", conf);
Supplier<Boolean> needUpdate = () -> {
try {
return (Boolean) invokeSimpleMethod(loadManager, "needBrokerDataUpdate");
} catch (Exception e) {
throw new RuntimeException(e);
}
};
lastData.setMsgRateIn(100);
currentData.setMsgRateIn(104);
// 4% difference: shouldn't trigger an update.
assert (!needUpdate.get());
currentData.setMsgRateIn(105.1);
// 5% difference: should trigger an update (exactly 5% is flaky due to precision).
assert (needUpdate.get());
// Do similar tests for lower values.
currentData.setMsgRateIn(94);
assert (needUpdate.get());
currentData.setMsgRateIn(95.1);
assert (!needUpdate.get());
// 0 to non-zero should always trigger an update.
lastData.setMsgRateIn(0);
currentData.setMsgRateIn(1e-8);
assert (needUpdate.get());
// non-zero to zero should trigger an update as long as the threshold is less than 100.
lastData.setMsgRateIn(1e-8);
currentData.setMsgRateIn(0);
assert (needUpdate.get());
// zero to zero should never trigger an update.
currentData.setMsgRateIn(0);
lastData.setMsgRateIn(0);
assert (!needUpdate.get());
// Minimally test other absolute values to ensure they are included.
lastData.getCpu().usage = 100;
lastData.getCpu().limit = 1000;
currentData.getCpu().usage = 106;
currentData.getCpu().limit = 1000;
assert (!needUpdate.get());
// Minimally test other absolute values to ensure they are included.
lastData.getCpu().usage = 100;
lastData.getCpu().limit = 1000;
currentData.getCpu().usage = 206;
currentData.getCpu().limit = 1000;
assert (needUpdate.get());
lastData.setCpu(new ResourceUsage());
currentData.setCpu(new ResourceUsage());
lastData.setMsgThroughputIn(100);
currentData.setMsgThroughputIn(106);
assert (needUpdate.get());
currentData.setMsgThroughputIn(100);
lastData.setNumBundles(100);
currentData.setNumBundles(106);
assert (needUpdate.get());
currentData.setNumBundles(100);
assert (!needUpdate.get());
}
}