YARN-9425. Make initialDelay configurable for FederationStateStoreService#scheduledExecutorService (#4731). Contributed by  groot and Shen Yinjie.

Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index c1fa8c0..e8e0467 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3920,6 +3920,13 @@
   public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
       "yarnfederation/";
 
+  public static final String FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY =
+      FEDERATION_PREFIX + "state-store.heartbeat.initial-delay";
+
+  // 30 secs
+  public static final int
+      DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY = 30;
+
   public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
       FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 68d8ed9..313ac8b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3626,6 +3626,16 @@
   </property>
   <property>
     <description>
+      Initial delay for federation state-store heartbeat service. Value is followed by a unit
+      specifier: ns, us, ms, s, m, h, d for nanoseconds, microseconds, milliseconds, seconds,
+      minutes, hours, days respectively. Values should provide units,
+      but seconds are assumed
+    </description>
+    <name>yarn.federation.state-store.heartbeat.initial-delay</name>
+    <value>30s</value>
+  </property>
+  <property>
+    <description>
       Machine list file to be loaded by the FederationSubCluster Resolver
     </description>
     <name>yarn.federation.machine-list</name>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
index 6fb43ca..a473186 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
@@ -96,6 +96,7 @@
   private FederationStateStore stateStoreClient = null;
   private SubClusterId subClusterId;
   private long heartbeatInterval;
+  private long heartbeatInitialDelay;
   private RMContext rmContext;
 
   public FederationStateStoreService(RMContext rmContext) {
@@ -126,10 +127,24 @@
     heartbeatInterval = conf.getLong(
         YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS,
         YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
+
     if (heartbeatInterval <= 0) {
       heartbeatInterval =
           YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS;
     }
+
+    heartbeatInitialDelay = conf.getTimeDuration(
+        YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
+        YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
+        TimeUnit.SECONDS);
+
+    if (heartbeatInitialDelay <= 0) {
+      LOG.warn("{} configured value is wrong, must be > 0; using default value of {}",
+          YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
+          YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY);
+      heartbeatInitialDelay =
+          YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY;
+    }
     LOG.info("Initialized federation membership service.");
 
     super.serviceInit(conf);
@@ -206,9 +221,9 @@
     scheduledExecutorService =
         HadoopExecutors.newSingleThreadScheduledExecutor();
     scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat,
-        heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS);
-    LOG.info("Started federation membership heartbeat with interval: {}",
-        heartbeatInterval);
+        heartbeatInitialDelay, heartbeatInterval, TimeUnit.SECONDS);
+    LOG.info("Started federation membership heartbeat with interval: {} and initial delay: {}",
+        heartbeatInterval, heartbeatInitialDelay);
   }
 
   @VisibleForTesting
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
index e5e156d..e8ebdd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
@@ -25,6 +25,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
@@ -173,4 +174,37 @@
     return response.getCapability();
   }
 
+  @Test
+  public void testFederationStateStoreServiceInitialHeartbeatDelay() throws Exception {
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+
+    GenericTestUtils.LogCapturer logCapture =
+        GenericTestUtils.LogCapturer.captureLogs(FederationStateStoreService.LOG);
+
+    final MockRM rm = new MockRM(conf);
+
+    // Initially there should be no entry for the sub-cluster
+    rm.init(conf);
+    stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+    GetSubClusterInfoResponse response = stateStore.getSubCluster(request);
+    Assert.assertNull(response);
+
+    // Validate if sub-cluster is registered
+    rm.start();
+    String capability = checkSubClusterInfo(SubClusterState.SC_NEW);
+    Assert.assertTrue(capability.isEmpty());
+
+    // Heartbeat to see if sub-cluster transitions to running
+    FederationStateStoreHeartbeat storeHeartbeat =
+        rm.getFederationStateStoreService().getStateStoreHeartbeatThread();
+    storeHeartbeat.run();
+    capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
+    checkClusterMetricsInfo(capability, 0);
+
+    Assert.assertTrue(logCapture.getOutput().contains(
+        "Started federation membership heartbeat with interval: 300 and initial delay: 10"));
+    rm.stop();
+  }
 }