HDFS-17026. RBF: NamenodeHeartbeatService should update JMX report with configurable frequency. (#5691). Contributed by hchaverri.

Signed-off-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
index 9ffcea1..5c5b132 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
@@ -21,6 +21,8 @@
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -47,6 +49,7 @@
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.Time;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -107,6 +110,15 @@
   /** URL scheme to use for JMX calls. */
   private String scheme;
 
+  /** Frequency of updates to JMX report. */
+  private long updateJmxIntervalMs;
+  /** Timestamp of last attempt to update JMX report. */
+  private long lastJmxUpdateAttempt;
+  /** Result of the last successful FsNamesystemMetrics report. */
+  private JSONArray fsNamesystemMetrics;
+  /** Result of the last successful NamenodeInfoMetrics report. */
+  private JSONArray namenodeInfoMetrics;
+
   private String resolvedHost;
   private String originalNnId;
 
@@ -233,6 +245,9 @@
       this.healthMonitorTimeoutMs = (int) timeoutMs;
     }
 
+    this.updateJmxIntervalMs = conf.getTimeDuration(DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS,
+        DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT, TimeUnit.MILLISECONDS);
+
     super.serviceInit(configuration);
   }
 
@@ -447,8 +462,13 @@
       String address, NamenodeStatusReport report) {
     try {
       // TODO part of this should be moved to its own utility
-      getFsNamesystemMetrics(address, report);
-      getNamenodeInfoMetrics(address, report);
+      if (shouldUpdateJmx()) {
+        this.lastJmxUpdateAttempt = Time.monotonicNow();
+        getFsNamesystemMetrics(address);
+        getNamenodeInfoMetrics(address);
+      }
+      populateFsNamesystemMetrics(this.fsNamesystemMetrics, report);
+      populateNamenodeInfoMetrics(this.namenodeInfoMetrics, report);
     } catch (Exception e) {
       LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
     }
@@ -483,16 +503,37 @@
   }
 
   /**
+   * Evaluates whether the JMX report should be refreshed by
+   * calling the Namenode, based on the following conditions:
+   * 1. JMX Updates must be enabled.
+   * 2. The last attempt to update JMX occurred before the
+   *    configured interval (if any).
+   */
+  private boolean shouldUpdateJmx() {
+    if (this.updateJmxIntervalMs < 0) {
+      return false;
+    }
+
+    return Time.monotonicNow() - this.lastJmxUpdateAttempt > this.updateJmxIntervalMs;
+  }
+
+  /**
    * Fetches NamenodeInfo metrics from namenode.
    * @param address Web interface of the Namenode to monitor.
-   * @param report Namenode status report to update with JMX data.
-   * @throws JSONException
    */
-  private void getNamenodeInfoMetrics(String address,
-      NamenodeStatusReport report) throws JSONException {
+  private void getNamenodeInfoMetrics(String address) {
     String query = "Hadoop:service=NameNode,name=NameNodeInfo";
-    JSONArray aux =
-        FederationUtil.getJmx(query, address, connectionFactory, scheme);
+    this.namenodeInfoMetrics = FederationUtil.getJmx(query, address, connectionFactory, scheme);
+  }
+
+  /**
+   * Populates NamenodeInfo metrics into report.
+   * @param aux NamenodeInfo metrics from namenode.
+   * @param report Namenode status report to update with JMX data.
+   * @throws JSONException When an invalid JSONObject is found
+   */
+  private void populateNamenodeInfoMetrics(JSONArray aux, NamenodeStatusReport report)
+      throws JSONException {
     if (aux != null && aux.length() > 0) {
       JSONObject jsonObject = aux.getJSONObject(0);
       String name = jsonObject.getString("name");
@@ -510,14 +551,20 @@
   /**
    * Fetches FSNamesystem* metrics from namenode.
    * @param address Web interface of the Namenode to monitor.
-   * @param report Namenode status report to update with JMX data.
-   * @throws JSONException
    */
-  private void getFsNamesystemMetrics(String address,
-      NamenodeStatusReport report) throws JSONException {
+  private void getFsNamesystemMetrics(String address) {
     String query = "Hadoop:service=NameNode,name=FSNamesystem*";
-    JSONArray aux = FederationUtil.getJmx(
-        query, address, connectionFactory, scheme);
+    this.fsNamesystemMetrics = FederationUtil.getJmx(query, address, connectionFactory, scheme);
+  }
+
+  /**
+   * Populates FSNamesystem* metrics into report.
+   * @param aux FSNamesystem* metrics from namenode.
+   * @param report Namenode status report to update with JMX data.
+   * @throws JSONException When invalid JSONObject is found.
+   */
+  private void populateFsNamesystemMetrics(JSONArray aux, NamenodeStatusReport report)
+      throws JSONException {
     if (aux != null) {
       for (int i = 0; i < aux.length(); i++) {
         JSONObject jsonObject = aux.getJSONObject(i);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index f47d6ce..3230af8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -115,6 +115,9 @@
       FEDERATION_ROUTER_PREFIX + "heartbeat-state.interval";
   public static final long DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT =
       TimeUnit.SECONDS.toMillis(5);
+  public static final String DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS =
+      FEDERATION_ROUTER_PREFIX + "namenode.heartbeat.jmx.interval";
+  public static final long DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT = 0;
 
   // HDFS Router NN client
   public static final String
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index c7b403c..8322a72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -472,6 +472,16 @@
   </property>
 
   <property>
+    <name>dfs.federation.router.namenode.heartbeat.jmx.interval</name>
+    <value>0</value>
+    <description>
+      How often the Router should request JMX reports from the Namenode in miliseconds.
+      If this value is 0, it will request JMX reports every time a Namenode report is requested.
+      If this value is negative, it will disable JMX reports from the Namenode.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.federation.router.store.router.expiration</name>
     <value>5m</value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
index bae2dea..9ee9692 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
@@ -34,6 +34,7 @@
 import java.util.Set;
 import java.util.TreeSet;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -293,7 +294,32 @@
     verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name());
   }
 
+  @Test
+  public void testJmxRequestFrequency() {
+    // Disable JMX requests
+    Configuration conf = getNamenodesConfig();
+    conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS, -1);
+    verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 0, 1);
+
+    // Set JMX requests to lower frequency
+    conf = getNamenodesConfig();
+    conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS,
+        TimeUnit.MINUTES.toMillis(5));
+    verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 1, 2);
+
+    // Set JMX requests to default frequency
+    conf = getNamenodesConfig();
+    verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 2, 2);
+  }
+
   private void verifyUrlSchemes(String scheme) {
+    int httpRequests = HttpConfig.Policy.HTTP_ONLY.name().equals(scheme) ? 1 : 0;
+    int httpsRequests = HttpConfig.Policy.HTTPS_ONLY.name().equals(scheme) ? 1 : 0;
+    verifyUrlSchemes(scheme, getNamenodesConfig(), httpRequests, httpsRequests, 1);
+  }
+
+  private void verifyUrlSchemes(String scheme, Configuration conf, int httpRequests,
+      int httpsRequests, int requestsPerService) {
 
     // Attach our own log appender so we can verify output
     final LogVerificationAppender appender =
@@ -304,7 +330,6 @@
     GenericTestUtils.setRootLogLevel(Level.DEBUG);
 
     // Setup and start the Router
-    Configuration conf = getNamenodesConfig();
     conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, scheme);
     Configuration routerConf = new RouterConfigBuilder(conf)
         .heartbeat(true)
@@ -318,15 +343,12 @@
     Collection<NamenodeHeartbeatService> heartbeatServices =
         router.getNamenodeHeartbeatServices();
     for (NamenodeHeartbeatService heartbeatService : heartbeatServices) {
-      heartbeatService.getNamenodeStatusReport();
+      for (int request = 0; request < requestsPerService; request++) {
+        heartbeatService.getNamenodeStatusReport();
+      }
     }
-    if (HttpConfig.Policy.HTTPS_ONLY.name().equals(scheme)) {
-      assertEquals(2, appender.countLinesWithMessage("JMX URL: https://"));
-      assertEquals(0, appender.countLinesWithMessage("JMX URL: http://"));
-    } else {
-      assertEquals(2, appender.countLinesWithMessage("JMX URL: http://"));
-      assertEquals(0, appender.countLinesWithMessage("JMX URL: https://"));
-    }
+    assertEquals(httpsRequests * 2, appender.countLinesWithMessage("JMX URL: https://"));
+    assertEquals(httpRequests * 2, appender.countLinesWithMessage("JMX URL: http://"));
   }
 
   /**