blob: 0741f1aed441a01a33af7c69858ec540eeb35e57 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
import static org.junit.Assert.assertEquals;
public class TestRouterRefreshFairnessPolicyController {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterRefreshFairnessPolicyController.class);
private final GenericTestUtils.LogCapturer controllerLog =
GenericTestUtils.LogCapturer.captureLogs(AbstractRouterRpcFairnessPolicyController.LOG);
private StateStoreDFSCluster cluster;
@BeforeClass
public static void setLogLevel() {
GenericTestUtils.setLogLevel(AbstractRouterRpcFairnessPolicyController.LOG, Level.DEBUG);
}
@After
public void cleanup() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Before
public void setupCluster() throws Exception {
cluster = new StateStoreDFSCluster(false, 2);
Configuration conf = new RouterConfigBuilder().stateStore().rpc().build();
// Handlers concurrent:ns0 = 3:3
conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
StaticRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class);
conf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 9);
// Allow metrics
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, true);
// Datanodes not needed for this test.
cluster.setNumDatanodesPerNameservice(0);
cluster.addRouterOverrides(conf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
}
@Test
public void testRefreshNonexistentHandlerClass() {
MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
"org.apache.hadoop.hdfs.server.federation.fairness.ThisControllerDoesNotExist");
assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
routerContext.getRouterRpcClient()
.refreshFairnessPolicyController(routerContext.getConf()));
}
@Test
public void testRefreshClassDoesNotImplementControllerInterface() {
MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
routerContext.getConf()
.set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, "java.lang.String");
assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
routerContext.getRouterRpcClient()
.refreshFairnessPolicyController(routerContext.getConf()));
}
@Test
public void testRefreshSuccessful() {
MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
StaticRouterRpcFairnessPolicyController.class.getCanonicalName());
assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
routerContext.getRouterRpcClient()
.refreshFairnessPolicyController(routerContext.getConf()));
routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
NoRouterRpcFairnessPolicyController.class.getCanonicalName());
assertEquals(NoRouterRpcFairnessPolicyController.class.getCanonicalName(),
routerContext.getRouterRpcClient()
.refreshFairnessPolicyController(routerContext.getConf()));
}
@Test
public void testConcurrentRefreshRequests() throws InterruptedException {
MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient());
controllerLog.clearOutput();
// Spawn 100 concurrent refresh requests
Thread[] threads = new Thread[100];
for (int i = 0; i < 100; i++) {
threads[i] = new Thread(() ->
client.refreshFairnessPolicyController(routerContext.getConf()));
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
// There should be 100 controller shutdowns. All controllers created should be shut down.
assertEquals(100, StringUtils.countMatches(controllerLog.getOutput(),
"Shutting down router fairness policy controller"));
controllerLog.clearOutput();
}
@Test
public void testRefreshStaticChangeHandlers() throws Exception {
// Setup and mock
MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient());
final long sleepTime = 3000;
Mockito.doAnswer(invocationOnMock -> {
Thread.sleep(sleepTime);
return null;
}).when(client)
.invokeMethod(Mockito.any(), Mockito.any(), Mockito.anyBoolean(),
Mockito.any(), Mockito.any(), Mockito.any());
// No calls yet
assertEquals("{}",
routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs());
List<Thread> preRefreshInvocations = makeDummyInvocations(client, 4, "ns0");
Thread.sleep(2000);
// 3 permits acquired, calls will take 3s to finish and release permits
// 1 invocation rejected
assertEquals("{\"ns0\":3}",
routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs());
assertEquals("{\"ns0\":1}",
routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitRejectedPerNs());
Configuration conf = routerContext.getConf();
final int newNs0Permits = 2;
final int newNs1Permits = 4;
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns0", newNs0Permits);
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", newNs1Permits);
Thread threadRefreshController = new Thread(() -> client.
refreshFairnessPolicyController(routerContext.getConf()));
threadRefreshController.start();
threadRefreshController.join();
// Wait for all dummy invocation threads to finish
for (Thread thread : preRefreshInvocations) {
thread.join();
}
// Controller should now have 2:4 handlers for ns0:ns1
// Make 4 calls to ns0 and 6 calls to ns1 so that each will fail twice
StaticRouterRpcFairnessPolicyController controller =
(StaticRouterRpcFairnessPolicyController) client.getRouterRpcFairnessPolicyController();
System.out.println(controller.getAvailableHandlerOnPerNs());
List<Thread> ns0Invocations = makeDummyInvocations(client, newNs0Permits + 2, "ns0");
List<Thread> ns1Invocations = makeDummyInvocations(client, newNs1Permits + 2, "ns1");
// Wait for these threads to finish
for (Thread thread : ns0Invocations) {
thread.join();
}
for (Thread thread : ns1Invocations) {
thread.join();
}
assertEquals("{\"ns0\":5,\"ns1\":4}",
routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs());
assertEquals("{\"ns0\":3,\"ns1\":2}",
routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitRejectedPerNs());
}
private List<Thread> makeDummyInvocations(RouterRpcClient client, final int nThreads,
final String namespace) {
RemoteMethod dummyMethod = Mockito.mock(RemoteMethod.class);
List<Thread> threadAcquirePermits = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
Thread threadAcquirePermit = new Thread(() -> {
try {
client.invokeSingle(namespace, dummyMethod);
} catch (IOException e) {
e.printStackTrace();
}
});
threadAcquirePermits.add(threadAcquirePermit);
threadAcquirePermit.start();
}
return threadAcquirePermits;
}
}