HDFS-13274. RBF: Extend RouterRpcClient to use multiple sockets (#4531)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
index bd2d8c9..df026bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
@@ -20,6 +20,7 @@
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.util.Time;
@@ -53,9 +54,14 @@
   private long lastActiveTs = 0;
   /** The connection's active status would expire after this window. */
   private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30);
+  /** The maximum number of requests that this connection can handle concurrently. **/
+  private final int maxConcurrencyPerConn;
 
-  public ConnectionContext(ProxyAndInfo<?> connection) {
+  public ConnectionContext(ProxyAndInfo<?> connection, Configuration conf) {
     this.client = connection;
+    this.maxConcurrencyPerConn = conf.getInt(
+        RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY,
+        RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT);
   }
 
   /**
@@ -93,6 +99,23 @@
    * @return True if the connection can be used.
    */
   public synchronized boolean isUsable() {
+    return hasAvailableConcurrency() && !isClosed();
+  }
+
+  /**
+   * Return true if this connection context still has available concurrency,
+   * else return false.
+   */
+  private synchronized boolean hasAvailableConcurrency() {
+    return this.numThreads < maxConcurrencyPerConn;
+  }
+
+  /**
+   *  Check if the connection is idle. It checks if the connection is not used
+   *  by another thread.
+   * @return True if the connection is not used by another thread.
+   */
+  public synchronized boolean isIdle() {
     return !isActive() && !isClosed();
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
index 293a4b6..a2aa7c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -77,7 +77,6 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(ConnectionPool.class);
 
-
   /** Configuration settings for the connection pool. */
   private final Configuration conf;
 
@@ -94,6 +93,8 @@
   private volatile List<ConnectionContext> connections = new ArrayList<>();
   /** Connection index for round-robin. */
   private final AtomicInteger clientIndex = new AtomicInteger(0);
+  /** Underlying socket index. **/
+  private final AtomicInteger socketIndex = new AtomicInteger(0);
 
   /** Min number of connections per user. */
   private final int minSize;
@@ -105,6 +106,9 @@
   /** The last time a connection was active. */
   private volatile long lastActiveTime = 0;
 
+  /** Enable using multiple physical socket or not. **/
+  private final boolean enableMultiSocket;
+
   /** Map for the protocols and their protobuf implementations. */
   private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
   static {
@@ -149,9 +153,12 @@
     this.minSize = minPoolSize;
     this.maxSize = maxPoolSize;
     this.minActiveRatio = minActiveRatio;
+    this.enableMultiSocket = conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY,
+        RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT);
 
     // Add minimum connections to the pool
-    for (int i=0; i<this.minSize; i++) {
+    for (int i = 0; i < this.minSize; i++) {
       ConnectionContext newConnection = newConnection();
       this.connections.add(newConnection);
     }
@@ -210,24 +217,23 @@
    * @return Connection context.
    */
   protected ConnectionContext getConnection() {
-
     this.lastActiveTime = Time.now();
-
-    // Get a connection from the pool following round-robin
-    ConnectionContext conn = null;
     List<ConnectionContext> tmpConnections = this.connections;
-    int size = tmpConnections.size();
-    // Inc and mask off sign bit, lookup index should be non-negative int
-    int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF;
-    for (int i=0; i<size; i++) {
-      int index = (threadIndex + i) % size;
-      conn = tmpConnections.get(index);
-      if (conn != null && conn.isUsable()) {
-        return conn;
+    for (ConnectionContext tmpConnection : tmpConnections) {
+      if (tmpConnection != null && tmpConnection.isUsable()) {
+        return tmpConnection;
       }
     }
 
-    // We return a connection even if it's active
+    ConnectionContext conn = null;
+    // We return a connection even if it's busy
+    int size = tmpConnections.size();
+    if (size > 0) {
+      // Get a connection from the pool following round-robin
+      // Inc and mask off sign bit, lookup index should be non-negative int
+      int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF;
+      conn = tmpConnections.get(threadIndex % size);
+    }
     return conn;
   }
 
@@ -256,10 +262,9 @@
       int targetCount = Math.min(num, this.connections.size() - this.minSize);
       // Remove and close targetCount of connections
       List<ConnectionContext> tmpConnections = new ArrayList<>();
-      for (int i = 0; i < this.connections.size(); i++) {
-        ConnectionContext conn = this.connections.get(i);
+      for (ConnectionContext conn : this.connections) {
         // Only pick idle connections to close
-        if (removed.size() < targetCount && conn.isUsable()) {
+        if (removed.size() < targetCount && conn.isIdle()) {
           removed.add(conn);
         } else {
           tmpConnections.add(conn);
@@ -267,8 +272,8 @@
       }
       this.connections = tmpConnections;
     }
-    LOG.debug("Expected to remove {} connection " +
-        "and actually removed {} connections", num, removed.size());
+    LOG.debug("Expected to remove {} connection and actually removed {} connections",
+        num, removed.size());
     return removed;
   }
 
@@ -303,7 +308,6 @@
    */
   protected int getNumActiveConnections() {
     int ret = 0;
-
     List<ConnectionContext> tmpConnections = this.connections;
     for (ConnectionContext conn : tmpConnections) {
       if (conn.isActive()) {
@@ -320,10 +324,9 @@
    */
   protected int getNumIdleConnections() {
     int ret = 0;
-
     List<ConnectionContext> tmpConnections = this.connections;
     for (ConnectionContext conn : tmpConnections) {
-      if (conn.isUsable()) {
+      if (conn.isIdle()) {
         ret++;
       }
     }
@@ -393,8 +396,9 @@
    * @throws IOException If it cannot get a new connection.
    */
   public ConnectionContext newConnection() throws IOException {
-    return newConnection(
-        this.conf, this.namenodeAddress, this.ugi, this.protocol);
+    return newConnection(this.conf, this.namenodeAddress,
+        this.ugi, this.protocol, this.enableMultiSocket,
+        this.socketIndex.incrementAndGet());
   }
 
   /**
@@ -402,19 +406,20 @@
    * context for a single user/security context. To maximize throughput it is
    * recommended to use multiple connection per user+server, allowing multiple
    * writes and reads to be dispatched in parallel.
-   * @param <T>
+   * @param <T> Input type T.
    *
    * @param conf Configuration for the connection.
    * @param nnAddress Address of server supporting the ClientProtocol.
    * @param ugi User context.
    * @param proto Interface of the protocol.
+   * @param enableMultiSocket Enable multiple socket or not.
    * @return proto for the target ClientProtocol that contains the user's
    *         security context.
    * @throws IOException If it cannot be created.
    */
   protected static <T> ConnectionContext newConnection(Configuration conf,
-      String nnAddress, UserGroupInformation ugi, Class<T> proto)
-      throws IOException {
+      String nnAddress, UserGroupInformation ugi, Class<T> proto,
+      boolean enableMultiSocket, int socketIndex) throws IOException {
     if (!PROTO_MAP.containsKey(proto)) {
       String msg = "Unsupported protocol for connection to NameNode: "
           + ((proto != null) ? proto.getName() : "null");
@@ -437,15 +442,23 @@
     }
     InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
     final long version = RPC.getProtocolVersion(classes.protoPb);
-    Object proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
-        conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
+    Object proxy;
+    if (enableMultiSocket) {
+      FederationConnectionId connectionId = new FederationConnectionId(
+          socket, classes.protoPb, ugi, RPC.getRpcTimeout(conf),
+          defaultPolicy, conf, socketIndex);
+      proxy = RPC.getProtocolProxy(classes.protoPb, version, connectionId,
+          conf, factory).getProxy();
+    } else {
+      proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
+          conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
+    }
+
     T client = newProtoClient(proto, classes, proxy);
     Text dtService = SecurityUtil.buildTokenService(socket);
 
-    ProxyAndInfo<T> clientProxy =
-        new ProxyAndInfo<T>(client, dtService, socket);
-    ConnectionContext connection = new ConnectionContext(clientProxy);
-    return connection;
+    ProxyAndInfo<T> clientProxy = new ProxyAndInfo<T>(client, dtService, socket);
+    return new ConnectionContext(clientProxy, conf);
   }
 
   private static <T> T newProtoClient(Class<T> proto, ProtoImpl classes,
@@ -453,7 +466,7 @@
     try {
       Constructor<?> constructor =
           classes.protoClientPb.getConstructor(classes.protoPb);
-      Object o = constructor.newInstance(new Object[] {proxy});
+      Object o = constructor.newInstance(proxy);
       if (proto.isAssignableFrom(o.getClass())) {
         @SuppressWarnings("unchecked")
         T client = (T) o;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationConnectionId.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationConnectionId.java
new file mode 100644
index 0000000..0be1f8b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationConnectionId.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.net.InetSocketAddress;
+
+public class FederationConnectionId extends Client.ConnectionId {
+  private final int index;
+
+  public FederationConnectionId(InetSocketAddress address, Class<?> protocol,
+      UserGroupInformation ticket, int rpcTimeout,
+      RetryPolicy connectionRetryPolicy, Configuration conf, int index) {
+    super(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf);
+    this.index = index;
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder()
+        .append(super.hashCode())
+        .append(this.index)
+        .toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!super.equals(obj)) {
+      return false;
+    }
+    if (obj instanceof FederationConnectionId) {
+      FederationConnectionId other = (FederationConnectionId)obj;
+      return new EqualsBuilder()
+          .append(this.index, other.index)
+          .isEquals();
+    }
+    return false;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 3b6df41..266e3c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -135,6 +135,12 @@
       FEDERATION_ROUTER_PREFIX + "connection.clean.ms";
   public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
       TimeUnit.SECONDS.toMillis(10);
+  public static final String DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY =
+      FEDERATION_ROUTER_PREFIX + "enable.multiple.socket";
+  public static final boolean DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT = false;
+  public static final String DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY =
+      FEDERATION_ROUTER_PREFIX + "max.concurrency.per.connection";
+  public static final int DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT = 1;
 
   // HDFS Router RPC client
   public static final String DFS_ROUTER_CLIENT_THREADS_SIZE =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 51d9b8a..a261ddc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -135,6 +135,33 @@
   </property>
 
   <property>
+    <name>dfs.federation.router.enable.multiple.socket</name>
+    <value>false</value>
+    <description>
+      If enable multiple downstream socket or not. If true, ConnectionPool
+      will use a new socket when creating a new connection for the same user,
+      and RouterRPCClient will get a better throughput. It's best used with
+      dfs.federation.router.max.concurrency.per.connection together to get
+      a better throughput with fewer sockets. Such as enable
+      dfs.federation.router.enable.multiple.socket and
+      set dfs.federation.router.max.concurrency.per.connection = 20.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.max.concurrency.per.connection</name>
+    <value>1</value>
+    <description>
+      The maximum number of requests that a connection can handle concurrently.
+      When the number of requests being processed by a socket is less than this value,
+      new request will be processed by this socket. When enable
+      dfs.federation.router.enable.multiple.socket, it's best
+      set this value greater than 1, such as 20, to avoid frequent
+      creation and idle sockets in the case of a NS with jitter requests.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.federation.router.connection.pool.clean.ms</name>
     <value>60000</value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
index ff2cea5..4c6b151 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
@@ -416,11 +416,13 @@
 The Router forwards the client requests to the NameNodes.
 It uses a pool of connections to reduce the latency of creating them.
 
-| Property | Default | Description|
+| Property | Default | Description |
 |:---- |:---- |:---- |
 | dfs.federation.router.connection.pool-size | 1 | Size of the pool of connections from the router to namenodes. |
 | dfs.federation.router.connection.clean.ms | 10000 | Time interval, in milliseconds, to check if the connection pool should remove unused connections. |
 | dfs.federation.router.connection.pool.clean.ms | 60000 | Time interval, in milliseconds, to check if the connection manager should remove unused connection pools. |
+| dfs.federation.router.enable.multiple.socket | false | If true, ConnectionPool will use a new socket when creating a new connection for the same user. And it's best used with dfs.federation.router.max.concurrency.per.connection together. |
+| dfs.federation.router.max.concurrency.per.connection | 1 | The maximum number of requests that a connection can handle concurrently. |
 
 ### Admin server
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
index acb79cb..04c2540 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
@@ -36,6 +36,7 @@
 import java.util.concurrent.BlockingQueue;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.assertNotNull;
@@ -132,6 +133,44 @@
   }
 
   @Test
+  public void testGetConnectionWithConcurrency() throws Exception {
+    Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
+    Configuration copyConf = new Configuration(conf);
+    copyConf.setInt(RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY, 20);
+
+    ConnectionPool pool = new ConnectionPool(
+        copyConf, TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f,
+        ClientProtocol.class);
+    poolMap.put(
+        new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
+        pool);
+    assertEquals(1, pool.getNumConnections());
+    // one connection can process the maximum number of requests concurrently.
+    for (int i = 0; i < 20; i++) {
+      ConnectionContext cc = pool.getConnection();
+      assertTrue(cc.isUsable());
+      cc.getClient();
+    }
+    assertEquals(1, pool.getNumConnections());
+
+    // Ask for more and this returns an unusable connection
+    ConnectionContext cc1 = pool.getConnection();
+    assertTrue(cc1.isActive());
+    assertFalse(cc1.isUsable());
+
+    // add a new connection into pool
+    pool.addConnection(pool.newConnection());
+    // will return the new connection
+    ConnectionContext cc2 = pool.getConnection();
+    assertTrue(cc2.isUsable());
+    cc2.getClient();
+
+    assertEquals(2, pool.getNumConnections());
+
+    checkPoolConnections(TEST_USER1, 2, 2);
+  }
+
+  @Test
   public void testConnectionCreatorWithException() throws Exception {
     // Create a bad connection pool pointing to unresolvable namenode address.
     ConnectionPool badPool = new ConnectionPool(
@@ -317,6 +356,6 @@
         "Unsupported protocol for connection to NameNode: "
             + TestConnectionManager.class.getName(),
         () -> ConnectionPool.newConnection(conf, TEST_NN_ADDRESS, TEST_USER1,
-            TestConnectionManager.class));
+            TestConnectionManager.class, false, 0));
   }
 }