blob: 04c2540c2aaf99250e85676d7d11230a2266b0f2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotNull;
/**
* Test functionalities of {@link ConnectionManager}, which manages a pool
* of connections to NameNodes.
*/
public class TestConnectionManager {
private Configuration conf;
private ConnectionManager connManager;
private static final String[] TEST_GROUP = new String[]{"TEST_GROUP"};
private static final UserGroupInformation TEST_USER1 =
UserGroupInformation.createUserForTesting("user1", TEST_GROUP);
private static final UserGroupInformation TEST_USER2 =
UserGroupInformation.createUserForTesting("user2", TEST_GROUP);
private static final UserGroupInformation TEST_USER3 =
UserGroupInformation.createUserForTesting("user3", TEST_GROUP);
private static final String TEST_NN_ADDRESS = "nn1:8080";
private static final String UNRESOLVED_TEST_NN_ADDRESS = "unknownhost:8080";
@Before
public void setup() throws Exception {
conf = new Configuration();
connManager = new ConnectionManager(conf);
NetUtils.addStaticResolution("nn1", "localhost");
NetUtils.createSocketAddrForHost("nn1", 8080);
connManager.start();
}
@Rule
public ExpectedException exceptionRule = ExpectedException.none();
@After
public void shutdown() {
if (connManager != null) {
connManager.close();
}
}
@Test
public void testCleanup() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
ConnectionPool pool1 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class);
addConnectionsToPool(pool1, 9, 4);
poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
pool1);
ConnectionPool pool2 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class);
addConnectionsToPool(pool2, 10, 10);
poolMap.put(
new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class),
pool2);
checkPoolConnections(TEST_USER1, 9, 4);
checkPoolConnections(TEST_USER2, 10, 10);
// Clean up first pool, one connection should be removed, and second pool
// should remain the same.
connManager.cleanup(pool1);
checkPoolConnections(TEST_USER1, 8, 4);
checkPoolConnections(TEST_USER2, 10, 10);
// Clean up the first pool again, it should have no effect since it reached
// the MIN_ACTIVE_RATIO.
connManager.cleanup(pool1);
checkPoolConnections(TEST_USER1, 8, 4);
checkPoolConnections(TEST_USER2, 10, 10);
// Make sure the number of connections doesn't go below minSize
ConnectionPool pool3 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class);
addConnectionsToPool(pool3, 8, 0);
poolMap.put(
new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class),
pool3);
checkPoolConnections(TEST_USER3, 10, 0);
for (int i = 0; i < 10; i++) {
connManager.cleanup(pool3);
}
checkPoolConnections(TEST_USER3, 2, 0);
// With active connections added to pool, make sure it honors the
// MIN_ACTIVE_RATIO again
addConnectionsToPool(pool3, 8, 2);
checkPoolConnections(TEST_USER3, 10, 2);
for (int i = 0; i < 10; i++) {
connManager.cleanup(pool3);
}
checkPoolConnections(TEST_USER3, 4, 2);
}
@Test
public void testGetConnectionWithConcurrency() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
Configuration copyConf = new Configuration(conf);
copyConf.setInt(RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY, 20);
ConnectionPool pool = new ConnectionPool(
copyConf, TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f,
ClientProtocol.class);
poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
pool);
assertEquals(1, pool.getNumConnections());
// one connection can process the maximum number of requests concurrently.
for (int i = 0; i < 20; i++) {
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isUsable());
cc.getClient();
}
assertEquals(1, pool.getNumConnections());
// Ask for more and this returns an unusable connection
ConnectionContext cc1 = pool.getConnection();
assertTrue(cc1.isActive());
assertFalse(cc1.isUsable());
// add a new connection into pool
pool.addConnection(pool.newConnection());
// will return the new connection
ConnectionContext cc2 = pool.getConnection();
assertTrue(cc2.isUsable());
cc2.getClient();
assertEquals(2, pool.getNumConnections());
checkPoolConnections(TEST_USER1, 2, 2);
}
@Test
public void testConnectionCreatorWithException() throws Exception {
// Create a bad connection pool pointing to unresolvable namenode address.
ConnectionPool badPool = new ConnectionPool(
conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f,
ClientProtocol.class);
BlockingQueue<ConnectionPool> queue = new ArrayBlockingQueue<>(1);
queue.add(badPool);
ConnectionManager.ConnectionCreator connectionCreator =
new ConnectionManager.ConnectionCreator(queue);
connectionCreator.setDaemon(true);
connectionCreator.start();
// Wait to make sure async thread is scheduled and picks
GenericTestUtils.waitFor(queue::isEmpty, 50, 5000);
// At this point connection creation task should be definitely picked up.
assertTrue(queue.isEmpty());
// At this point connection thread should still be alive.
assertTrue(connectionCreator.isAlive());
// Stop the thread as test is successful at this point
connectionCreator.interrupt();
}
@Test
public void testGetConnectionWithException() throws Exception {
String exceptionCause = "java.net.UnknownHostException: unknownhost";
exceptionRule.expect(IllegalArgumentException.class);
exceptionRule.expectMessage(exceptionCause);
// Create a bad connection pool pointing to unresolvable namenode address.
ConnectionPool badPool = new ConnectionPool(
conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f,
ClientProtocol.class);
}
@Test
public void testGetConnection() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
final int totalConns = 10;
int activeConns = 5;
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class);
addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
pool);
// All remaining connections should be usable
final int remainingSlots = totalConns - activeConns;
for (int i = 0; i < remainingSlots; i++) {
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isUsable());
cc.getClient();
activeConns++;
}
checkPoolConnections(TEST_USER1, totalConns, activeConns);
// Ask for more and this returns an active connection
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isActive());
}
@Test
public void testValidClientIndex() throws Exception {
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class);
for(int i = -3; i <= 3; i++) {
pool.getClientIndex().set(i);
ConnectionContext conn = pool.getConnection();
assertNotNull(conn);
assertTrue(conn.isUsable());
}
}
@Test
public void getGetConnectionNamenodeProtocol() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
final int totalConns = 10;
int activeConns = 5;
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class);
addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put(
new ConnectionPoolId(
TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class),
pool);
// All remaining connections should be usable
final int remainingSlots = totalConns - activeConns;
for (int i = 0; i < remainingSlots; i++) {
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isUsable());
cc.getClient();
activeConns++;
}
checkPoolConnections(TEST_USER1, totalConns, activeConns);
// Ask for more and this returns an active connection
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isActive());
}
private void addConnectionsToPool(ConnectionPool pool, int numTotalConn,
int numActiveConn) throws IOException {
for (int i = 0; i < numTotalConn; i++) {
ConnectionContext cc = pool.newConnection();
pool.addConnection(cc);
if (i < numActiveConn) {
cc.getClient();
}
}
}
private void checkPoolConnections(UserGroupInformation ugi,
int numOfConns, int numOfActiveConns) {
boolean connPoolFoundForUser = false;
for (Map.Entry<ConnectionPoolId, ConnectionPool> e :
connManager.getPools().entrySet()) {
if (e.getKey().getUgi() == ugi) {
assertEquals(numOfConns, e.getValue().getNumConnections());
assertEquals(numOfActiveConns, e.getValue().getNumActiveConnections());
// idle + active = total connections
assertEquals(numOfConns - numOfActiveConns,
e.getValue().getNumIdleConnections());
connPoolFoundForUser = true;
}
}
if (!connPoolFoundForUser) {
fail("Connection pool not found for user " + ugi.getUserName());
}
}
@Test
public void testConfigureConnectionActiveRatio() throws IOException {
// test 1 conn below the threshold and these conns are closed
testConnectionCleanup(0.8f, 10, 7, 9);
// test 2 conn below the threshold and these conns are closed
testConnectionCleanup(0.8f, 10, 6, 8);
}
private void testConnectionCleanup(float ratio, int totalConns,
int activeConns, int leftConns) throws IOException {
Configuration tmpConf = new Configuration();
// Set dfs.federation.router.connection.min-active-ratio
tmpConf.setFloat(
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, ratio);
ConnectionManager tmpConnManager = new ConnectionManager(tmpConf);
tmpConnManager.start();
// Create one new connection pool
tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS,
NamenodeProtocol.class);
Map<ConnectionPoolId, ConnectionPool> poolMap = tmpConnManager.getPools();
ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1,
TEST_NN_ADDRESS, NamenodeProtocol.class);
ConnectionPool pool = poolMap.get(connectionPoolId);
// Test min active ratio is as set value
assertEquals(ratio, pool.getMinActiveRatio(), 0.001f);
pool.getConnection().getClient();
// Test there is one active connection in pool
assertEquals(1, pool.getNumActiveConnections());
// Add other active-1 connections / totalConns-1 connections to pool
addConnectionsToPool(pool, totalConns - 1, activeConns - 1);
// There are activeConn connections.
// We can cleanup the pool
tmpConnManager.cleanup(pool);
assertEquals(leftConns, pool.getNumConnections());
tmpConnManager.close();
}
@Test
public void testUnsupportedProtoExceptionMsg() throws Exception {
LambdaTestUtils.intercept(IllegalStateException.class,
"Unsupported protocol for connection to NameNode: "
+ TestConnectionManager.class.getName(),
() -> ConnectionPool.newConnection(conf, TEST_NN_ADDRESS, TEST_USER1,
TestConnectionManager.class, false, 0));
}
}