blob: 3c51e13182c92143de8118af9bb0fe985a54a3e4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the Router overload control which rejects requests when the RPC client
* is overloaded. This feature is managed by
* {@link RBFConfigKeys#DFS_ROUTER_CLIENT_REJECT_OVERLOAD}.
*/
public class TestRouterClientRejectOverload {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterClientRejectOverload.class);
private StateStoreDFSCluster cluster;
@After
public void cleanup() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private void setupCluster(boolean overloadControl) throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 2);
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.metrics()
.admin()
.rpc()
.build();
// Reduce the number of RPC clients threads to overload the Router easy
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4);
// Overload control
routerConf.setBoolean(
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, overloadControl);
// No need for datanodes as we use renewLease() for testing
cluster.setNumDatanodesPerNameservice(0);
cluster.addRouterOverrides(routerConf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
}
@Test
public void testWithoutOverloadControl() throws Exception {
setupCluster(false);
// Nobody should get overloaded
testOverloaded(0);
// Set subcluster 0 as slow
MiniDFSCluster dfsCluster = cluster.getCluster();
NameNode nn0 = dfsCluster.getNameNode(0);
simulateSlowNamenode(nn0, 1);
// Nobody should get overloaded, but it will be really slow
testOverloaded(0);
// No rejected requests expected
for (RouterContext router : cluster.getRouters()) {
FederationRPCMetrics rpcMetrics =
router.getRouter().getRpcServer().getRPCMetrics();
assertEquals(0, rpcMetrics.getProxyOpFailureClientOverloaded());
}
}
@Test
public void testOverloadControl() throws Exception {
setupCluster(true);
List<RouterContext> routers = cluster.getRouters();
FederationRPCMetrics rpcMetrics0 =
routers.get(0).getRouter().getRpcServer().getRPCMetrics();
FederationRPCMetrics rpcMetrics1 =
routers.get(1).getRouter().getRpcServer().getRPCMetrics();
// Nobody should get overloaded
testOverloaded(0);
assertEquals(0, rpcMetrics0.getProxyOpFailureClientOverloaded());
assertEquals(0, rpcMetrics1.getProxyOpFailureClientOverloaded());
// Set subcluster 0 as slow
MiniDFSCluster dfsCluster = cluster.getCluster();
NameNode nn0 = dfsCluster.getNameNode(0);
simulateSlowNamenode(nn0, 1);
// The subcluster should be overloaded now and reject 4-5 requests
testOverloaded(4, 6);
assertTrue(rpcMetrics0.getProxyOpFailureClientOverloaded()
+ rpcMetrics1.getProxyOpFailureClientOverloaded() >= 4);
// Client using HA with 2 Routers
// A single Router gets overloaded, but 2 will handle it
Configuration clientConf = cluster.getRouterClientConf();
// Each Router should get a similar number of ops (>=8) out of 2*10
long iniProxyOps0 = rpcMetrics0.getProxyOps();
long iniProxyOps1 = rpcMetrics1.getProxyOps();
testOverloaded(0, 0, new URI("hdfs://fed/"), clientConf, 10);
long proxyOps0 = rpcMetrics0.getProxyOps() - iniProxyOps0;
long proxyOps1 = rpcMetrics1.getProxyOps() - iniProxyOps1;
assertEquals(2 * 10, proxyOps0 + proxyOps1);
assertTrue(proxyOps0 + " operations: not distributed", proxyOps0 >= 8);
assertTrue(proxyOps1 + " operations: not distributed", proxyOps1 >= 8);
}
private void testOverloaded(int expOverload) throws Exception {
testOverloaded(expOverload, expOverload);
}
private void testOverloaded(int expOverloadMin, int expOverloadMax)
throws Exception {
RouterContext routerContext = cluster.getRandomRouter();
URI address = routerContext.getFileSystemURI();
Configuration conf = new HdfsConfiguration();
testOverloaded(expOverloadMin, expOverloadMax, address, conf, 10);
}
/**
* Test if the Router gets overloaded by submitting requests in parallel.
* We check how many requests got rejected at the end.
* @param expOverloadMin Min number of requests expected as overloaded.
* @param expOverloadMax Max number of requests expected as overloaded.
* @param address Destination address.
* @param conf Configuration of the client.
* @param numOps Number of operations to submit.
* @throws Exception If it cannot perform the test.
*/
private void testOverloaded(int expOverloadMin, int expOverloadMax,
final URI address, final Configuration conf, final int numOps)
throws Exception {
// Submit renewLease() ops which go to all subclusters
final AtomicInteger overloadException = new AtomicInteger();
ExecutorService exec = Executors.newFixedThreadPool(numOps);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < numOps; i++) {
// Stagger the operations a little (50ms)
final int sleepTime = i * 50;
Future<?> future = exec.submit(new Runnable() {
@Override
public void run() {
DFSClient routerClient = null;
try {
Thread.sleep(sleepTime);
routerClient = new DFSClient(address, conf);
String clientName = routerClient.getClientName();
ClientProtocol routerProto = routerClient.getNamenode();
routerProto.renewLease(clientName);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException();
assertTrue("Wrong exception: " + ioe,
ioe instanceof StandbyException);
assertExceptionContains("is overloaded", ioe);
overloadException.incrementAndGet();
} catch (IOException e) {
fail("Unexpected exception: " + e);
} catch (InterruptedException e) {
fail("Cannot sleep: " + e);
} finally {
if (routerClient != null) {
try {
routerClient.close();
} catch (IOException e) {
LOG.error("Cannot close the client");
}
}
}
}
});
futures.add(future);
}
// Wait until all the requests are done
while (!futures.isEmpty()) {
futures.remove(0).get();
}
exec.shutdown();
int num = overloadException.get();
if (expOverloadMin == expOverloadMax) {
assertEquals(expOverloadMin, num);
} else {
assertTrue("Expected >=" + expOverloadMin + " but was " + num,
num >= expOverloadMin);
assertTrue("Expected <=" + expOverloadMax + " but was " + num,
num <= expOverloadMax);
}
}
}