SAMZA-2602: Dynamic heartbeat establish with new AM	 (#1442)

- Introduce config for cluster based job coordinator high availability
- Dynamic heartbeat establish with new ApplicationMaster
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 932c4c8..1caf88e 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -324,6 +324,9 @@
 |job.container.count|1|The number of YARN containers to request for running your job. This is the main parameter for controlling the scale (allocated computing resources) of your job: to increase the parallelism of processing, you need to increase the number of containers. The minimum is one container, and the maximum number of containers is the number of task instances (usually the number of input stream partitions). Task instances are evenly distributed across the number of containers that you specify.|
 |cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to request from the cluster manager per container of your job. Along with cluster-manager.container.cpu.cores, this property determines how many containers the cluster manager will run on one machine. If the container exceeds this limit, it will be killed, so it is important that the container's actual memory use remains below the limit. The amount of memory used is normally the JVM heap size (configured with task.opts), plus the size of any off-heap memory allocation (for example stores.*.container.cache.size.bytes), plus a safety margin to allow for JVM overheads.|
 |cluster-manager.container.cpu.cores|1|The number of CPU cores to request per container of your job. Each node in the cluster has a certain number of CPU cores available, so this number (along with cluster-manager.container.memory.mb) determines how many containers can be run on one machine.|
+|yarn.am.high-availability.enabled|false|If true, enables Job Coordinator (AM) high availability (HA) where a new AM can establish connection with already running containers.  
+|yarn.container.heartbeat.retry.count|5|If AM-HA is enabled, when a running container loses heartbeat with AM, this count gives the number of times an already running container will attempt to establish heartbeat with new AM|
+|yarn.container.heartbeat.retry-sleep-duration.ms|10000|If AM-HA is enabled, when a running container loses heartbeat with AM, this duration gives the amount of time a running container will sleep between attempts to establish heartbeat with new AM.|
 
 ##### <a name="advanced-cluster-configurations"></a>[5.1.1 Advanced Cluster Configurations](#advanced-cluster-configurations)
 |Name|Default|Description|
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 0b274e9..021c67d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -147,6 +147,21 @@
 
   private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
 
+  // Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability (AM-HA).
+  // High availability allows new AM to establish connection with already running containers
+  public static final String YARN_AM_HIGH_AVAILABILITY_ENABLED = "yarn.am.high-availability.enabled";
+  public static final boolean YARN_AM_HIGH_AVAILABILITY_ENABLED_DEFAULT = false;
+
+  // If AM-HA is enabled, when a running container loses heartbeat with AM,
+  // this count gives the number of times an already running container will attempt to establish heartbeat with new AM.
+  public static final String YARN_CONTAINER_HEARTBEAT_RETRY_COUNT = "yarn.container.heartbeat.retry.count";
+  public static final long YARN_CONTAINER_HEARTBEAT_RETRY_COUNT_DEFAULT = 5;
+
+  // If AM-HA is enabled, when a running container loses heartbeat with AM,
+  // this duration gives the amount of time a running container will sleep between attempts to establish heartbeat with new AM.
+  public static final String YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS = "yarn.container.heartbeat.retry-sleep-duration.ms";
+  public static final long YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS_DEFAULT = 10000;
+
   public JobConfig(Config config) {
     super(config);
   }
@@ -398,6 +413,19 @@
     return get(COORDINATOR_STREAM_FACTORY, DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY);
   }
 
+  public boolean getApplicationMasterHighAvailabilityEnabled() {
+    return getBoolean(YARN_AM_HIGH_AVAILABILITY_ENABLED, YARN_AM_HIGH_AVAILABILITY_ENABLED_DEFAULT);
+  }
+
+  public long getContainerHeartbeatRetryCount() {
+    return getLong(YARN_CONTAINER_HEARTBEAT_RETRY_COUNT, YARN_CONTAINER_HEARTBEAT_RETRY_COUNT_DEFAULT);
+  }
+
+  public long getContainerHeartbeatRetrySleepDurationMs() {
+    return getLong(YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS,
+        YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS_DEFAULT);
+  }
+
   /**
    * Get config loader factory according to the configs
    * @return full qualified name of {@link ConfigLoaderFactory}
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 89b5fc9..7e20f26 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -26,6 +26,10 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.util.ThreadUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,26 +38,47 @@
 public class ContainerHeartbeatMonitor {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
   private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
+  private static final CoordinatorStreamValueSerde SERDE = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+
   @VisibleForTesting
   static final int SCHEDULE_MS = 60000;
   @VisibleForTesting
   static final int SHUTDOWN_TIMOUT_MS = 120000;
 
   private final Runnable onContainerExpired;
-  private final ContainerHeartbeatClient containerHeartbeatClient;
   private final ScheduledExecutorService scheduler;
+  private final String containerExecutionId;
+  private final MetadataStore coordinatorStreamStore;
+  private final long sleepDurationForReconnectWithAM;
+  private final boolean isApplicationMasterHighAvailabilityEnabled;
+  private final long retryCount;
+
+  private ContainerHeartbeatClient containerHeartbeatClient;
+  private String coordinatorUrl;
   private boolean started = false;
 
-  public ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient) {
-    this(onContainerExpired, containerHeartbeatClient, Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY));
+  public ContainerHeartbeatMonitor(Runnable onContainerExpired, String coordinatorUrl, String containerExecutionId,
+      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled, long retryCount,
+      long sleepDurationForReconnectWithAM) {
+    this(onContainerExpired, new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId),
+        Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), coordinatorUrl, containerExecutionId,
+        coordinatorStreamStore, isApplicationMasterHighAvailabilityEnabled, retryCount, sleepDurationForReconnectWithAM);
   }
 
   @VisibleForTesting
   ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient,
-      ScheduledExecutorService scheduler) {
+      ScheduledExecutorService scheduler, String coordinatorUrl, String containerExecutionId,
+      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled,
+      long retryCount, long sleepDurationForReconnectWithAM) {
     this.onContainerExpired = onContainerExpired;
     this.containerHeartbeatClient = containerHeartbeatClient;
     this.scheduler = scheduler;
+    this.coordinatorUrl = coordinatorUrl;
+    this.containerExecutionId = containerExecutionId;
+    this.coordinatorStreamStore = coordinatorStreamStore;
+    this.isApplicationMasterHighAvailabilityEnabled = isApplicationMasterHighAvailabilityEnabled;
+    this.retryCount = retryCount;
+    this.sleepDurationForReconnectWithAM = sleepDurationForReconnectWithAM;
   }
 
   public void start() {
@@ -65,6 +90,12 @@
     scheduler.scheduleAtFixedRate(() -> {
       ContainerHeartbeatResponse response = containerHeartbeatClient.requestHeartbeat();
       if (!response.isAlive()) {
+        if (isApplicationMasterHighAvailabilityEnabled) {
+          LOG.warn("Failed to establish connection with {}. Checking for new AM", coordinatorUrl);
+          if (checkAndEstablishConnectionWithNewAM()) {
+            return;
+          }
+        }
         scheduler.schedule(() -> {
           // On timeout of container shutting down, force exit.
           LOG.error("Graceful shutdown timeout expired. Force exiting.");
@@ -84,6 +115,38 @@
     }
   }
 
+  private boolean checkAndEstablishConnectionWithNewAM() {
+    boolean response = false;
+    int attempt = 1;
+
+    while (attempt <= retryCount) {
+      String newCoordinatorUrl = SERDE.fromBytes(coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL));
+      try {
+        if (coordinatorUrl.equals(newCoordinatorUrl)) {
+          LOG.info("Attempt {} to discover new AM. Sleep for {}ms before next attempt.", attempt, sleepDurationForReconnectWithAM);
+          Thread.sleep(sleepDurationForReconnectWithAM);
+        } else {
+          LOG.info("Found new AM: {}. Establishing heartbeat with the new AM.", newCoordinatorUrl);
+          coordinatorUrl = newCoordinatorUrl;
+          containerHeartbeatClient = createContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
+          response = containerHeartbeatClient.requestHeartbeat().isAlive();
+          LOG.info("Received heartbeat response: {} from new AM: {}", response, this.coordinatorUrl);
+          break;
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted during sleep.");
+        Thread.currentThread().interrupt();
+      }
+      attempt++;
+    }
+    return response;
+  }
+
+  @VisibleForTesting
+  ContainerHeartbeatClient createContainerHeartbeatClient(String coordinatorUrl, String containerExecutionId) {
+    return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
+  }
+
   private static class HeartbeatThreadFactory implements ThreadFactory {
     private static final String PREFIX = "Samza-" + ContainerHeartbeatMonitor.class.getSimpleName() + "-";
     private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index a5148fb..ec477c9 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -30,7 +30,6 @@
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.SamzaContainer;
@@ -39,9 +38,11 @@
 import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
@@ -142,7 +143,12 @@
           listener = new ClusterBasedProcessorLifecycleListener(config, processorLifecycleListener, container::shutdown);
       container.setContainerListener(listener);
 
-      ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
+      JobConfig jobConfig = new JobConfig(config);
+      ContainerHeartbeatMonitor heartbeatMonitor =
+          createContainerHeartbeatMonitor(container,
+              new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE),
+              jobConfig.getApplicationMasterHighAvailabilityEnabled(), jobConfig.getContainerHeartbeatRetryCount(),
+              jobConfig.getContainerHeartbeatRetrySleepDurationMs());
       if (heartbeatMonitor != null) {
         heartbeatMonitor.start();
       }
@@ -189,9 +195,15 @@
   /**
    * Creates a new container heartbeat monitor if possible.
    * @param container the container to monitor
+   * @param coordinatorStreamStore the metadata store to fetch coordinator url from
+   * @param isApplicaitonMasterHighAvailabilityEnabled whether AM HA is enabled to fetch new AM url
+   * @param retryCount number of times to retry connecting to new AM when heartbeat expires
+   * @param sleepDurationForReconnectWithAM sleep duration between retries to connect to new AM when heartbeat expires
    * @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
    */
-  private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container) {
+  private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container,
+      MetadataStore coordinatorStreamStore, boolean isApplicaitonMasterHighAvailabilityEnabled, long retryCount,
+      long sleepDurationForReconnectWithAM) {
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
     String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
     if (executionEnvContainerId != null) {
@@ -204,7 +216,8 @@
           log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
           System.exit(1);
         }
-      }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
+      }, coordinatorUrl, executionEnvContainerId, coordinatorStreamStore, isApplicaitonMasterHighAvailabilityEnabled,
+          retryCount, sleepDurationForReconnectWithAM);
     } else {
       log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
       return null;
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
index 2445c00..ff297d1 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -22,6 +22,10 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -33,6 +37,7 @@
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -42,6 +47,8 @@
   private Runnable onExpired;
   @Mock
   private ContainerHeartbeatClient containerHeartbeatClient;
+  @Mock
+  private MetadataStore coordinatorStreamStore;
 
   private ScheduledExecutorService scheduler;
   /**
@@ -51,13 +58,18 @@
 
   private ContainerHeartbeatMonitor containerHeartbeatMonitor;
 
+  private static final String COORDINATOR_URL = "http://some-host.prod.linkedin.com";
+  private static final ContainerHeartbeatResponse FAILURE_RESPONSE = new ContainerHeartbeatResponse(false);
+  private static final ContainerHeartbeatResponse SUCCESS_RESPONSE = new ContainerHeartbeatResponse(true);
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
     this.schedulerFixedRateExecutionLatch = new CountDownLatch(1);
     this.scheduler = buildScheduledExecutorService(this.schedulerFixedRateExecutionLatch);
     this.containerHeartbeatMonitor =
-        new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler);
+        new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, false, 5, 10);
   }
 
   @Test
@@ -93,6 +105,75 @@
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE).thenReturn(SUCCESS_RESPONSE);
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should not have been submitted
+    verify(this.scheduler, never()).schedule(any(Runnable.class), anyLong(), any());
+    verify(this.onExpired, never()).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
+
+  @Test
+  public void testFailedToFetchNewAMCoordinatorUrl() throws InterruptedException {
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(COORDINATOR_URL));
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should have been submitted
+    verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
+
+  @Test
+  public void testConnectToNewAMFailed() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should have been submitted
+    verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
   /**
    * Build a mock {@link ScheduledExecutorService} which will execute a fixed-rate task once. It will count down on
    * {@code schedulerFixedRateExecutionLatch} when the task is finished executing.