HDFS-13274. RBF: Extend RouterRpcClient to use multiple sockets (#4531)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
index bd2d8c9..df026bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
@@ -20,6 +20,7 @@
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;
@@ -53,9 +54,14 @@
private long lastActiveTs = 0;
/** The connection's active status would expire after this window. */
private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30);
+ /** The maximum number of requests that this connection can handle concurrently. **/
+ private final int maxConcurrencyPerConn;
- public ConnectionContext(ProxyAndInfo<?> connection) {
+ public ConnectionContext(ProxyAndInfo<?> connection, Configuration conf) {
this.client = connection;
+ this.maxConcurrencyPerConn = conf.getInt(
+ RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY,
+ RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT);
}
/**
@@ -93,6 +99,23 @@
* @return True if the connection can be used.
*/
public synchronized boolean isUsable() {
+ return hasAvailableConcurrency() && !isClosed();
+ }
+
+ /**
+ * Return true if this connection context still has available concurrency,
+ * else return false.
+ */
+ private synchronized boolean hasAvailableConcurrency() {
+ return this.numThreads < maxConcurrencyPerConn;
+ }
+
+ /**
+ * Check if the connection is idle. It checks if the connection is not used
+ * by another thread.
+ * @return True if the connection is not used by another thread.
+ */
+ public synchronized boolean isIdle() {
return !isActive() && !isClosed();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
index 293a4b6..a2aa7c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -77,7 +77,6 @@
private static final Logger LOG =
LoggerFactory.getLogger(ConnectionPool.class);
-
/** Configuration settings for the connection pool. */
private final Configuration conf;
@@ -94,6 +93,8 @@
private volatile List<ConnectionContext> connections = new ArrayList<>();
/** Connection index for round-robin. */
private final AtomicInteger clientIndex = new AtomicInteger(0);
+ /** Underlying socket index. **/
+ private final AtomicInteger socketIndex = new AtomicInteger(0);
/** Min number of connections per user. */
private final int minSize;
@@ -105,6 +106,9 @@
/** The last time a connection was active. */
private volatile long lastActiveTime = 0;
+ /** Enable using multiple physical socket or not. **/
+ private final boolean enableMultiSocket;
+
/** Map for the protocols and their protobuf implementations. */
private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
static {
@@ -149,9 +153,12 @@
this.minSize = minPoolSize;
this.maxSize = maxPoolSize;
this.minActiveRatio = minActiveRatio;
+ this.enableMultiSocket = conf.getBoolean(
+ RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY,
+ RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT);
// Add minimum connections to the pool
- for (int i=0; i<this.minSize; i++) {
+ for (int i = 0; i < this.minSize; i++) {
ConnectionContext newConnection = newConnection();
this.connections.add(newConnection);
}
@@ -210,24 +217,23 @@
* @return Connection context.
*/
protected ConnectionContext getConnection() {
-
this.lastActiveTime = Time.now();
-
- // Get a connection from the pool following round-robin
- ConnectionContext conn = null;
List<ConnectionContext> tmpConnections = this.connections;
- int size = tmpConnections.size();
- // Inc and mask off sign bit, lookup index should be non-negative int
- int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF;
- for (int i=0; i<size; i++) {
- int index = (threadIndex + i) % size;
- conn = tmpConnections.get(index);
- if (conn != null && conn.isUsable()) {
- return conn;
+ for (ConnectionContext tmpConnection : tmpConnections) {
+ if (tmpConnection != null && tmpConnection.isUsable()) {
+ return tmpConnection;
}
}
- // We return a connection even if it's active
+ ConnectionContext conn = null;
+ // We return a connection even if it's busy
+ int size = tmpConnections.size();
+ if (size > 0) {
+ // Get a connection from the pool following round-robin
+ // Inc and mask off sign bit, lookup index should be non-negative int
+ int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF;
+ conn = tmpConnections.get(threadIndex % size);
+ }
return conn;
}
@@ -256,10 +262,9 @@
int targetCount = Math.min(num, this.connections.size() - this.minSize);
// Remove and close targetCount of connections
List<ConnectionContext> tmpConnections = new ArrayList<>();
- for (int i = 0; i < this.connections.size(); i++) {
- ConnectionContext conn = this.connections.get(i);
+ for (ConnectionContext conn : this.connections) {
// Only pick idle connections to close
- if (removed.size() < targetCount && conn.isUsable()) {
+ if (removed.size() < targetCount && conn.isIdle()) {
removed.add(conn);
} else {
tmpConnections.add(conn);
@@ -267,8 +272,8 @@
}
this.connections = tmpConnections;
}
- LOG.debug("Expected to remove {} connection " +
- "and actually removed {} connections", num, removed.size());
+ LOG.debug("Expected to remove {} connection and actually removed {} connections",
+ num, removed.size());
return removed;
}
@@ -303,7 +308,6 @@
*/
protected int getNumActiveConnections() {
int ret = 0;
-
List<ConnectionContext> tmpConnections = this.connections;
for (ConnectionContext conn : tmpConnections) {
if (conn.isActive()) {
@@ -320,10 +324,9 @@
*/
protected int getNumIdleConnections() {
int ret = 0;
-
List<ConnectionContext> tmpConnections = this.connections;
for (ConnectionContext conn : tmpConnections) {
- if (conn.isUsable()) {
+ if (conn.isIdle()) {
ret++;
}
}
@@ -393,8 +396,9 @@
* @throws IOException If it cannot get a new connection.
*/
public ConnectionContext newConnection() throws IOException {
- return newConnection(
- this.conf, this.namenodeAddress, this.ugi, this.protocol);
+ return newConnection(this.conf, this.namenodeAddress,
+ this.ugi, this.protocol, this.enableMultiSocket,
+ this.socketIndex.incrementAndGet());
}
/**
@@ -402,19 +406,20 @@
* context for a single user/security context. To maximize throughput it is
* recommended to use multiple connection per user+server, allowing multiple
* writes and reads to be dispatched in parallel.
- * @param <T>
+ * @param <T> Input type T.
*
* @param conf Configuration for the connection.
* @param nnAddress Address of server supporting the ClientProtocol.
* @param ugi User context.
* @param proto Interface of the protocol.
+ * @param enableMultiSocket Enable multiple socket or not.
* @return proto for the target ClientProtocol that contains the user's
* security context.
* @throws IOException If it cannot be created.
*/
protected static <T> ConnectionContext newConnection(Configuration conf,
- String nnAddress, UserGroupInformation ugi, Class<T> proto)
- throws IOException {
+ String nnAddress, UserGroupInformation ugi, Class<T> proto,
+ boolean enableMultiSocket, int socketIndex) throws IOException {
if (!PROTO_MAP.containsKey(proto)) {
String msg = "Unsupported protocol for connection to NameNode: "
+ ((proto != null) ? proto.getName() : "null");
@@ -437,15 +442,23 @@
}
InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
final long version = RPC.getProtocolVersion(classes.protoPb);
- Object proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
- conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
+ Object proxy;
+ if (enableMultiSocket) {
+ FederationConnectionId connectionId = new FederationConnectionId(
+ socket, classes.protoPb, ugi, RPC.getRpcTimeout(conf),
+ defaultPolicy, conf, socketIndex);
+ proxy = RPC.getProtocolProxy(classes.protoPb, version, connectionId,
+ conf, factory).getProxy();
+ } else {
+ proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
+ conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
+ }
+
T client = newProtoClient(proto, classes, proxy);
Text dtService = SecurityUtil.buildTokenService(socket);
- ProxyAndInfo<T> clientProxy =
- new ProxyAndInfo<T>(client, dtService, socket);
- ConnectionContext connection = new ConnectionContext(clientProxy);
- return connection;
+ ProxyAndInfo<T> clientProxy = new ProxyAndInfo<T>(client, dtService, socket);
+ return new ConnectionContext(clientProxy, conf);
}
private static <T> T newProtoClient(Class<T> proto, ProtoImpl classes,
@@ -453,7 +466,7 @@
try {
Constructor<?> constructor =
classes.protoClientPb.getConstructor(classes.protoPb);
- Object o = constructor.newInstance(new Object[] {proxy});
+ Object o = constructor.newInstance(proxy);
if (proto.isAssignableFrom(o.getClass())) {
@SuppressWarnings("unchecked")
T client = (T) o;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationConnectionId.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationConnectionId.java
new file mode 100644
index 0000000..0be1f8b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationConnectionId.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.net.InetSocketAddress;
+
+public class FederationConnectionId extends Client.ConnectionId {
+ private final int index;
+
+ public FederationConnectionId(InetSocketAddress address, Class<?> protocol,
+ UserGroupInformation ticket, int rpcTimeout,
+ RetryPolicy connectionRetryPolicy, Configuration conf, int index) {
+ super(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf);
+ this.index = index;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(super.hashCode())
+ .append(this.index)
+ .toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!super.equals(obj)) {
+ return false;
+ }
+ if (obj instanceof FederationConnectionId) {
+ FederationConnectionId other = (FederationConnectionId)obj;
+ return new EqualsBuilder()
+ .append(this.index, other.index)
+ .isEquals();
+ }
+ return false;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 3b6df41..266e3c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -135,6 +135,12 @@
FEDERATION_ROUTER_PREFIX + "connection.clean.ms";
public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(10);
+ public static final String DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY =
+ FEDERATION_ROUTER_PREFIX + "enable.multiple.socket";
+ public static final boolean DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT = false;
+ public static final String DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY =
+ FEDERATION_ROUTER_PREFIX + "max.concurrency.per.connection";
+ public static final int DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT = 1;
// HDFS Router RPC client
public static final String DFS_ROUTER_CLIENT_THREADS_SIZE =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 51d9b8a..a261ddc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -135,6 +135,33 @@
</property>
<property>
+ <name>dfs.federation.router.enable.multiple.socket</name>
+ <value>false</value>
+ <description>
+ If enable multiple downstream socket or not. If true, ConnectionPool
+ will use a new socket when creating a new connection for the same user,
+ and RouterRPCClient will get a better throughput. It's best used with
+ dfs.federation.router.max.concurrency.per.connection together to get
+ a better throughput with fewer sockets. Such as enable
+ dfs.federation.router.enable.multiple.socket and
+ set dfs.federation.router.max.concurrency.per.connection = 20.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.max.concurrency.per.connection</name>
+ <value>1</value>
+ <description>
+ The maximum number of requests that a connection can handle concurrently.
+ When the number of requests being processed by a socket is less than this value,
+ new request will be processed by this socket. When enable
+ dfs.federation.router.enable.multiple.socket, it's best
+ set this value greater than 1, such as 20, to avoid frequent
+ creation and idle sockets in the case of a NS with jitter requests.
+ </description>
+ </property>
+
+ <property>
<name>dfs.federation.router.connection.pool.clean.ms</name>
<value>60000</value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
index ff2cea5..4c6b151 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
@@ -416,11 +416,13 @@
The Router forwards the client requests to the NameNodes.
It uses a pool of connections to reduce the latency of creating them.
-| Property | Default | Description|
+| Property | Default | Description |
|:---- |:---- |:---- |
| dfs.federation.router.connection.pool-size | 1 | Size of the pool of connections from the router to namenodes. |
| dfs.federation.router.connection.clean.ms | 10000 | Time interval, in milliseconds, to check if the connection pool should remove unused connections. |
| dfs.federation.router.connection.pool.clean.ms | 60000 | Time interval, in milliseconds, to check if the connection manager should remove unused connection pools. |
+| dfs.federation.router.enable.multiple.socket | false | If true, ConnectionPool will use a new socket when creating a new connection for the same user. And it's best used with dfs.federation.router.max.concurrency.per.connection together. |
+| dfs.federation.router.max.concurrency.per.connection | 1 | The maximum number of requests that a connection can handle concurrently. |
### Admin server
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
index acb79cb..04c2540 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
@@ -36,6 +36,7 @@
import java.util.concurrent.BlockingQueue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotNull;
@@ -132,6 +133,44 @@
}
@Test
+ public void testGetConnectionWithConcurrency() throws Exception {
+ Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
+ Configuration copyConf = new Configuration(conf);
+ copyConf.setInt(RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY, 20);
+
+ ConnectionPool pool = new ConnectionPool(
+ copyConf, TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f,
+ ClientProtocol.class);
+ poolMap.put(
+ new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
+ pool);
+ assertEquals(1, pool.getNumConnections());
+ // one connection can process the maximum number of requests concurrently.
+ for (int i = 0; i < 20; i++) {
+ ConnectionContext cc = pool.getConnection();
+ assertTrue(cc.isUsable());
+ cc.getClient();
+ }
+ assertEquals(1, pool.getNumConnections());
+
+ // Ask for more and this returns an unusable connection
+ ConnectionContext cc1 = pool.getConnection();
+ assertTrue(cc1.isActive());
+ assertFalse(cc1.isUsable());
+
+ // add a new connection into pool
+ pool.addConnection(pool.newConnection());
+ // will return the new connection
+ ConnectionContext cc2 = pool.getConnection();
+ assertTrue(cc2.isUsable());
+ cc2.getClient();
+
+ assertEquals(2, pool.getNumConnections());
+
+ checkPoolConnections(TEST_USER1, 2, 2);
+ }
+
+ @Test
public void testConnectionCreatorWithException() throws Exception {
// Create a bad connection pool pointing to unresolvable namenode address.
ConnectionPool badPool = new ConnectionPool(
@@ -317,6 +356,6 @@
"Unsupported protocol for connection to NameNode: "
+ TestConnectionManager.class.getName(),
() -> ConnectionPool.newConnection(conf, TEST_NN_ADDRESS, TEST_USER1,
- TestConnectionManager.class));
+ TestConnectionManager.class, false, 0));
}
}