HDFS-16671. RBF: RouterRpcFairnessPolicyController supports configurable permit acquire timeout (#4597)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java
index fe498c6..db917be7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java
@@ -29,6 +29,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;
+
 /**
  * Base fairness policy that implements @RouterRpcFairnessPolicyController.
  * Internally a map of nameservice to Semaphore is used to control permits.
@@ -42,15 +45,26 @@
   /** Hash table to hold semaphore for each configured name service. */
   private Map<String, Semaphore> permits;
 
+  private long acquireTimeoutMs = DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;
+
   public void init(Configuration conf) {
     this.permits = new HashMap<>();
+    long timeoutMs = conf.getTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT,
+        DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+    if (timeoutMs >= 0) {
+      acquireTimeoutMs = timeoutMs;
+    } else {
+      LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
+          "Using default value of : {}ms instead.", timeoutMs,
+          DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT);
+    }
   }
 
   @Override
   public boolean acquirePermit(String nsId) {
     try {
       LOG.debug("Taking lock for nameservice {}", nsId);
-      return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS);
+      return this.permits.get(nsId).tryAcquire(acquireTimeoutMs, TimeUnit.MILLISECONDS);
     } catch (InterruptedException e) {
       LOG.debug("Cannot get a permit for nameservice {}", nsId);
     }
@@ -82,15 +96,13 @@
   @Override
   public String getAvailableHandlerOnPerNs() {
     JSONObject json = new JSONObject();
-    for (Map.Entry<String, Semaphore> entry : permits.entrySet()) {
+    permits.forEach((k, v) -> {
       try {
-        String nsId = entry.getKey();
-        int availableHandler = entry.getValue().availablePermits();
-        json.put(nsId, availableHandler);
+        json.put(k, v.availablePermits());
       } catch (JSONException e) {
-        LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e);
+        LOG.warn("Cannot put {} into JSONObject", k, e);
       }
-    }
+    });
     return json.toString();
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java
index aa0777f..35045bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java
@@ -50,13 +50,10 @@
     init(conf);
   }
 
-  public void init(Configuration conf)
-      throws IllegalArgumentException {
+  public void init(Configuration conf) throws IllegalArgumentException {
     super.init(conf);
     // Total handlers configured to process all incoming Rpc.
-    int handlerCount = conf.getInt(
-        DFS_ROUTER_HANDLER_COUNT_KEY,
-        DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+    int handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
 
     LOG.info("Handlers available for fairness assignment {} ", handlerCount);
 
@@ -71,8 +68,7 @@
     allConfiguredNS.add(CONCURRENT_NS);
     validateHandlersCount(conf, handlerCount, allConfiguredNS);
     for (String nsId : allConfiguredNS) {
-      int dedicatedHandlers =
-          conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
+      int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
       LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
       if (dedicatedHandlers > 0) {
         handlerCount -= dedicatedHandlers;
@@ -86,7 +82,7 @@
     // Assign remaining handlers equally to remaining name services and
     // general pool if applicable.
     if (!unassignedNS.isEmpty()) {
-      LOG.info("Unassigned ns {}", unassignedNS.toString());
+      LOG.info("Unassigned ns {}", unassignedNS);
       int handlersPerNS = handlerCount / unassignedNS.size();
       LOG.info("Handlers available per ns {}", handlersPerNS);
       for (String nsId : unassignedNS) {
@@ -101,24 +97,20 @@
     int existingPermits = getAvailablePermits(CONCURRENT_NS);
     if (leftOverHandlers > 0) {
       LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
-      insertNameServiceWithPermits(CONCURRENT_NS,
-          existingPermits + leftOverHandlers);
+      insertNameServiceWithPermits(CONCURRENT_NS, existingPermits + leftOverHandlers);
     }
-    LOG.info("Final permit allocation for concurrent ns: {}",
-        getAvailablePermits(CONCURRENT_NS));
+    LOG.info("Final permit allocation for concurrent ns: {}", getAvailablePermits(CONCURRENT_NS));
   }
 
   private static void logAssignment(String nsId, int count) {
-    LOG.info("Assigned {} handlers to nsId {} ",
-        count, nsId);
+    LOG.info("Assigned {} handlers to nsId {} ", count, nsId);
   }
 
-  private void validateHandlersCount(Configuration conf, int handlerCount,
-                                     Set<String> allConfiguredNS) {
+  private void validateHandlersCount(Configuration conf,
+      int handlerCount, Set<String> allConfiguredNS) {
     int totalDedicatedHandlers = 0;
     for (String nsId : allConfiguredNS) {
-      int dedicatedHandlers =
-              conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
+      int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
       if (dedicatedHandlers > 0) {
         // Total handlers should not be less than sum of dedicated handlers.
         totalDedicatedHandlers += dedicatedHandlers;
@@ -128,8 +120,7 @@
       }
     }
     if (totalDedicatedHandlers > handlerCount) {
-      String msg = String.format(ERROR_MSG, handlerCount,
-          totalDedicatedHandlers);
+      String msg = String.format(ERROR_MSG, handlerCount, totalDedicatedHandlers);
       LOG.error(msg);
       throw new IllegalArgumentException(msg);
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index c0a9e3f..3b6df41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -354,6 +354,10 @@
       NoRouterRpcFairnessPolicyController.class;
   public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
       FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
+  public static final String DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT =
+      FEDERATION_ROUTER_FAIRNESS_PREFIX + "acquire.timeout";
+  public static final long   DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT =
+      TimeUnit.SECONDS.toMillis(1);
 
   // HDFS Router Federation Rename.
   public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index fcf6a28..51d9b8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -724,6 +724,14 @@
   </property>
 
   <property>
+    <name>dfs.federation.router.fairness.acquire.timeout</name>
+    <value>1s</value>
+    <description>
+      The maximum time to wait for a permit.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.federation.router.federation.rename.bandwidth</name>
     <value>10</value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java
index 8307f66..1f5770b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java
@@ -23,10 +23,14 @@
 import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
+
 import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
@@ -84,6 +88,26 @@
   }
 
   @Test
+  public void testAcquireTimeout() {
+    Configuration conf = createConf(40);
+    conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30);
+    conf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 100, TimeUnit.MILLISECONDS);
+    RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
+        FederationUtil.newFairnessPolicyController(conf);
+
+    // ns1 should have 30 permits allocated
+    for (int i = 0; i < 30; i++) {
+      assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    }
+    long acquireBeginTimeMs = Time.monotonicNow();
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    long acquireTimeMs = Time.monotonicNow() - acquireBeginTimeMs;
+
+    // There are some other operations, so acquireTimeMs >= 100ms.
+    assertTrue(acquireTimeMs >= 100);
+  }
+
+  @Test
   public void testAllocationErrorWithZeroHandlers() {
     Configuration conf = createConf(0);
     verifyInstantiationError(conf, 0, 3);