blob: 0e1eb40783fe30ffd8bcfe5eedb44f65a371b00c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotNull;
/**
* Test functionalities of {@link ConnectionManager}, which manages a pool
* of connections to NameNodes.
*/
public class TestConnectionManager {
private Configuration conf;
private ConnectionManager connManager;
private static final String[] TEST_GROUP = new String[]{"TEST_GROUP"};
private static final UserGroupInformation TEST_USER1 =
UserGroupInformation.createUserForTesting("user1", TEST_GROUP);
private static final UserGroupInformation TEST_USER2 =
UserGroupInformation.createUserForTesting("user2", TEST_GROUP);
private static final UserGroupInformation TEST_USER3 =
UserGroupInformation.createUserForTesting("user3", TEST_GROUP);
private static final String TEST_NN_ADDRESS = "nn1:8080";
@Before
public void setup() throws Exception {
conf = new Configuration();
connManager = new ConnectionManager(conf);
NetUtils.addStaticResolution("nn1", "localhost");
NetUtils.createSocketAddrForHost("nn1", 8080);
connManager.start();
}
@After
public void shutdown() {
if (connManager != null) {
connManager.close();
}
}
@Test
public void testCleanup() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
ConnectionPool pool1 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class);
addConnectionsToPool(pool1, 9, 4);
poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
pool1);
ConnectionPool pool2 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, ClientProtocol.class);
addConnectionsToPool(pool2, 10, 10);
poolMap.put(
new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class),
pool2);
checkPoolConnections(TEST_USER1, 9, 4);
checkPoolConnections(TEST_USER2, 10, 10);
// Clean up first pool, one connection should be removed, and second pool
// should remain the same.
connManager.cleanup(pool1);
checkPoolConnections(TEST_USER1, 8, 4);
checkPoolConnections(TEST_USER2, 10, 10);
// Clean up the first pool again, it should have no effect since it reached
// the MIN_ACTIVE_RATIO.
connManager.cleanup(pool1);
checkPoolConnections(TEST_USER1, 8, 4);
checkPoolConnections(TEST_USER2, 10, 10);
// Make sure the number of connections doesn't go below minSize
ConnectionPool pool3 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, ClientProtocol.class);
addConnectionsToPool(pool3, 8, 0);
poolMap.put(
new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class),
pool3);
checkPoolConnections(TEST_USER3, 10, 0);
for (int i = 0; i < 10; i++) {
connManager.cleanup(pool3);
}
checkPoolConnections(TEST_USER3, 2, 0);
// With active connections added to pool, make sure it honors the
// MIN_ACTIVE_RATIO again
addConnectionsToPool(pool3, 8, 2);
checkPoolConnections(TEST_USER3, 10, 2);
for (int i = 0; i < 10; i++) {
connManager.cleanup(pool3);
}
checkPoolConnections(TEST_USER3, 4, 2);
}
@Test
public void testGetConnection() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
final int totalConns = 10;
int activeConns = 5;
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class);
addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
pool);
// All remaining connections should be usable
final int remainingSlots = totalConns - activeConns;
for (int i = 0; i < remainingSlots; i++) {
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isUsable());
cc.getClient();
activeConns++;
}
checkPoolConnections(TEST_USER1, totalConns, activeConns);
// Ask for more and this returns an active connection
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isActive());
}
@Test
public void testValidClientIndex() throws Exception {
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, ClientProtocol.class);
for(int i = -3; i <= 3; i++) {
pool.getClientIndex().set(i);
ConnectionContext conn = pool.getConnection();
assertNotNull(conn);
assertTrue(conn.isUsable());
}
}
@Test
public void getGetConnectionNamenodeProtocol() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
final int totalConns = 10;
int activeConns = 5;
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, NamenodeProtocol.class);
addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put(
new ConnectionPoolId(
TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class),
pool);
// All remaining connections should be usable
final int remainingSlots = totalConns - activeConns;
for (int i = 0; i < remainingSlots; i++) {
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isUsable());
cc.getClient();
activeConns++;
}
checkPoolConnections(TEST_USER1, totalConns, activeConns);
// Ask for more and this returns an active connection
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isActive());
}
private void addConnectionsToPool(ConnectionPool pool, int numTotalConn,
int numActiveConn) throws IOException {
for (int i = 0; i < numTotalConn; i++) {
ConnectionContext cc = pool.newConnection();
pool.addConnection(cc);
if (i < numActiveConn) {
cc.getClient();
}
}
}
private void checkPoolConnections(UserGroupInformation ugi,
int numOfConns, int numOfActiveConns) {
boolean connPoolFoundForUser = false;
for (Map.Entry<ConnectionPoolId, ConnectionPool> e :
connManager.getPools().entrySet()) {
if (e.getKey().getUgi() == ugi) {
assertEquals(numOfConns, e.getValue().getNumConnections());
assertEquals(numOfActiveConns, e.getValue().getNumActiveConnections());
connPoolFoundForUser = true;
}
}
if (!connPoolFoundForUser) {
fail("Connection pool not found for user " + ugi.getUserName());
}
}
}