Merge pull request #1437 from MabelYC/dupLog

SAMZA-2598:Kafka clients are not created and closed properly
diff --git a/KEYS b/KEYS
index 3cc4efc..ee681c1 100644
--- a/KEYS
+++ b/KEYS
@@ -6,11 +6,14 @@
   Create a key:
     gpg --gen-key
 
+  Private keys is stored in ~/.gnupg
+
   Adding your key to this file:
     (gpg --list-sigs <key id> && gpg --armor --export <key id>) >> this file.
 
   Publish the key:
     gpg --keyserver pgp.mit.edu --send-keys <key id>
+    gpg --keyserver keyserver.ubuntu.com --send-keys <key id>
 
   Signing another developers key:
     gpg --keyserver pgp.mit.edu --search-keys <name or email>
@@ -18,6 +21,13 @@
     gpg --sign-key <key id>
     gpg --keyserver pgp.mit.edu --send-keys <key id>
 
+  Siggning release with the key *may* require the following some of these steps:
+  git config --global gpg.program gpg2
+  git config --global user.signingkey D2103453 // where D2103453 is your key
+
+  export GPG_TTY=$(tty) // to allow passphrase entry
+  
+
   Additional Information:
     http://www.apache.org/dev/openpgp.html#generate-key
 
@@ -918,3 +928,40 @@
 VcSdqZkA/iKbC7Fv6xS7fTmhnLmAJ40h
 =Ja6V
 -----END PGP PUBLIC KEY BLOCK-----
+pub   2048R/D2103453 2020-11-19 [expires: 2023-11-19]
+uid                  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+sig 3        D2103453 2020-11-19  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+sub   2048R/4C01BF41 2020-11-19 [expires: 2023-11-19]
+sig          D2103453 2020-11-19  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v2.0.22 (GNU/Linux)
+
+mQENBF+2rJsBCACpNvGmT730xGXb0/lELUhzBu10BhdXfkr6FupYU6IxNMSZc/W4
+LKjB0ptF/OfodjdOhA2Gbv0kZt4mRBgmwe+vIJkFAXOZccq+G708ZPBsjifUDESN
+oG9/juCJSSrYGcclWiPg0Fe3bBWm5KPWSkYG1OnHTWnqT7a5n6comgZmNori5tmA
+aPN/RnEywEY7vnqyvcJnfPKysvvaoTUbfoUaiGIVGwprYnQ3huvq+o81etWAOVvZ
+qry+6m6fA5l35bmr29xlbiK7lYCShie7u03U9eRiaLGevFSziViq/cp7OlcAXVN4
+hGtuXh+NVMgaxsy7HIs4H+x9oc9IO+WpdNS5ABEBAAG0M0JvcmlzIFNoa29sbmlr
+IChMaW5rZWRpbiBTYW16YSkgPGJvcnlhc0BhcGFjaGUub3JnPokBPwQTAQIAKQUC
+X7asmwIbAwUJBaOagAcLCQgHAwIBBhUIAgkKCwQWAgMBAh4BAheAAAoJED6OsXvS
+EDRTz4IH/iWHlHFlvasnLCgVU+29NMrx+jFRXMG5pDzd/bNocrlD+E42ocpOU1vo
+bGwvey0I4Hi8DCGKR2p14IAs2tzaKYZR0kel1QesciI+ngyE/zWEI3xKv8OXiS8A
+XdQC1Eq8Q+BJhun3A8zuQU095r9/sygTFBWlOtQ1Wn2qJ5oC6005qDDbyDAbEsxp
+4m+VaM7faHGjpNOSekHCJW2wue55I+IEF/eH8BsRbw6DGlIFhspsl17wJQAcnmRu
+wakPKZGtEyjgwVYdmUpne7GW87n18JdT/6ywfTqw17/ly0zBJhl2M0JTq0d0+Z9P
+NFwt1pWDDJOtmMClMrj3PcAk5DVVPFG5AQ0EX7asmwEIALMe7kvm6j5bgrj2fZVg
+663bqvTSdn90WQ+lQgpURYXMz4VAFNKlaqDGOX2L94vdI4pIEvPfedQ0C9wZQFz3
+NujaRL4gsxx9CHolKzTs68DE7pyiikzpZFuSIKfv0bukSmzqgvz83VKH33jAZiqd
+HFzRh6ad0iMhDpF4g//E6SLnqJdrf335JAB1P/v9LH579XWfbGWegIkXOwG8QTPB
+LSslAKah8JFBILQfeL4MmbJnwb/BQqLIWVyK5hjzon+mxKEWHQNLaxM4R517JCFQ
+DNzBYyoqO517XXEbCdUQ/ZXCRH2+gCFEWj9kA2b6Frqf4B1PD8BUWBjBYDv1RNNd
+kfsAEQEAAYkBJQQYAQIADwUCX7asmwIbDAUJBaOagAAKCRA+jrF70hA0U/trB/9G
+gqyT6JSlmBY+8ep9NW+bDaoWQZY0f4kAR2JyehUM8kLrLqB7D3moah7XrR6nOoWt
+e56yp81+3hGLTcHK2WKapTgYssPrm/nv7NySMUw04UvRMrn+wV5U+7MUtdBxO23R
+hSRuqb40wPBzo97Lb7LOEqLF0OHNMiFSV1qLuhyDGg87zEkfJM+o52Y/CT9nh60Z
+5OXa1fCQhpPC29d7OsBJklE3EtwokCyeeqxJFpcDBNTqM+5GLWbRZp6YQfnNdoPB
+XgSlVbifAXWaMZr8kFpNF6H+zm6tZ3rclihqgWhUqrAAmT11qtMacbK8dRVw3im5
+oZPFXmvDQy/353sy1WRs
+=MLmR
+-----END PGP PUBLIC KEY BLOCK-----
diff --git a/RELEASE.md b/RELEASE.md
index 9b34f78..cd7fe90 100644
--- a/RELEASE.md
+++ b/RELEASE.md
@@ -53,7 +53,7 @@
     ./gradlew clean publishToMavenLocal
 ```
 
-To build a tarball suitable for an ASF source release (and its accompanying MD5 file):
+To build a tarball suitable for an ASF source release (and its accompanying md5 file):
 
 First, clean any non-checked-in files from git (this removes all such files without prompting):
 
@@ -179,7 +179,7 @@
    cd samza-dist
    mkdir $VERSION
    cp ${SAMZA_SRC_ROOT}/build/distribution/source/apache-samza-$VERSION-src.tgz $VERSION
-   cp ${SAMZA_SRC_ROOT}/build/distribution/source/apache-samza-$VERSION-src.tgz.MD5 $VERSION
+   cp ${SAMZA_SRC_ROOT}/build/distribution/source/apache-samza-$VERSION-src.tgz.md5 $VERSION
    cp ${SAMZA_SRC_ROOT}/build/distribution/source/apache-samza-$VERSION-src.tgz.asc $VERSION
    svn add $VERSION
    ```
diff --git a/build.gradle b/build.gradle
index e30776a..2931169 100644
--- a/build.gradle
+++ b/build.gradle
@@ -82,6 +82,7 @@
     'docs/_site/**',
     'docs/sitemap.xml',
     'docs/learn/documentation/*/api/javadocs/**',
+    'docs/learn/documentation/*/rest/javadocs/**',
     'docs/Gemfile.lock',
     'gradle/wrapper/**',
     'gradlew',
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 932c4c8..1caf88e 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -324,6 +324,9 @@
 |job.container.count|1|The number of YARN containers to request for running your job. This is the main parameter for controlling the scale (allocated computing resources) of your job: to increase the parallelism of processing, you need to increase the number of containers. The minimum is one container, and the maximum number of containers is the number of task instances (usually the number of input stream partitions). Task instances are evenly distributed across the number of containers that you specify.|
 |cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to request from the cluster manager per container of your job. Along with cluster-manager.container.cpu.cores, this property determines how many containers the cluster manager will run on one machine. If the container exceeds this limit, it will be killed, so it is important that the container's actual memory use remains below the limit. The amount of memory used is normally the JVM heap size (configured with task.opts), plus the size of any off-heap memory allocation (for example stores.*.container.cache.size.bytes), plus a safety margin to allow for JVM overheads.|
 |cluster-manager.container.cpu.cores|1|The number of CPU cores to request per container of your job. Each node in the cluster has a certain number of CPU cores available, so this number (along with cluster-manager.container.memory.mb) determines how many containers can be run on one machine.|
+|yarn.am.high-availability.enabled|false|If true, enables Job Coordinator (AM) high availability (HA) where a new AM can establish connection with already running containers.  
+|yarn.container.heartbeat.retry.count|5|If AM-HA is enabled, when a running container loses heartbeat with AM, this count gives the number of times an already running container will attempt to establish heartbeat with new AM|
+|yarn.container.heartbeat.retry-sleep-duration.ms|10000|If AM-HA is enabled, when a running container loses heartbeat with AM, this duration gives the amount of time a running container will sleep between attempts to establish heartbeat with new AM.|
 
 ##### <a name="advanced-cluster-configurations"></a>[5.1.1 Advanced Cluster Configurations](#advanced-cluster-configurations)
 |Name|Default|Description|
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 63ee3c7..b98c727 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -34,6 +34,7 @@
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.ExecutionContainerIdManager;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.InputStreamsDiscoveredException;
@@ -45,6 +46,7 @@
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.JobModelUtil;
@@ -206,6 +208,12 @@
     this.localityManager =
         new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
 
+    if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
+      ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
+          new NamespaceAwareCoordinatorStreamStore(metadataStore, SetExecutionEnvContainerIdMapping.TYPE));
+
+      state.processorToExecutionId.putAll(executionContainerIdManager.readExecutionEnvironmentContainerIdMapping());
+    }
     // build metastore for container placement messages
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(metadataStore);
 
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 5de9950..fa5f783 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -234,6 +234,13 @@
         if (isRequestExpired(request)) {
           updateExpiryMetrics(request);
           containerManager.handleExpiredRequest(processorId, preferredHost, request, this, resourceRequestState);
+          // SAMZA-2601: to prevent infinite looping and logs filling up the disk, when host affinity is disabled,
+          // we explicitly break the loop here and the whole process gets retried in run() after allocatorSleepIntervalMs
+          if (!hostAffinityEnabled) {
+            LOG.info("Waiting for resources to get allocated for request {},"
+                + " no retries will be issued since host affinity is disabled", request);
+            break;
+          }
         } else {
           LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
                   + "Request creation time: {}. Current Time: {}. Request timeout: {} ms", processorId, preferredHost,
@@ -447,7 +454,7 @@
    * @param request the request to check
    * @return true if request has expired
    */
-  private boolean isRequestExpired(SamzaResourceRequest request) {
+  protected boolean isRequestExpired(SamzaResourceRequest request) {
     long currTime = Instant.now().toEpochMilli();
     boolean requestExpired =  currTime - request.getRequestTimestamp().toEpochMilli() > getRequestTimeout(request).toMillis();
     if (requestExpired) {
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 1bc1669..a2ad540 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -241,7 +241,6 @@
 
     state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
     state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
-
     // Request initial set of containers
     LocalityModel localityModel = localityManager.readLocality();
     Map<String, String> processorToHost = new HashMap<>();
@@ -334,7 +333,6 @@
       return;
     }
     state.runningProcessors.remove(processorId);
-
     int exitStatus = resourceStatus.getExitCode();
     switch (exitStatus) {
       case SamzaResourceStatus.SUCCESS:
@@ -413,7 +411,6 @@
           processorId, containerId, containerHost);
       state.pendingProcessors.remove(processorId);
       state.runningProcessors.put(processorId, resource);
-
       if (state.neededProcessors.decrementAndGet() == 0) {
         state.jobHealthy.set(true);
       }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index 784f0b4..930d366 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -106,6 +106,17 @@
   public final ConcurrentMap<String, SamzaResource> runningProcessors = new ConcurrentHashMap<>(0);
 
   /**
+   * Map of Samza processor Id (aka logical id) to execution environment container id (aka physical id ex: yarn container id).
+   * This map will be used during the start up phase of new AM in AM-HA.
+   *
+   * This map is populated at startup of ClusterBasedJobCoordinator.
+   * It initially holds the processorId to execution id mapping (if any) present in the coordinator stream.
+   * This could correspond to processors currently running or from previous attempt or previous deploy.
+   * TODO: SAMZA-2607 : remove this map and all its usages.
+   */
+  public final ConcurrentMap<String, String> processorToExecutionId = new ConcurrentHashMap<>(0);
+
+  /**
    *  Map of the failed Samza processor ID to resource status of the last attempted of the container.
    *  This map is only used when {@link org.apache.samza.config.ClusterManagerConfig#CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES}
    *  is set to false, this map tracks the containers which have exhausted all retires for restart and JobCoordinator is
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 0b274e9..021c67d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -147,6 +147,21 @@
 
   private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
 
+  // Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability (AM-HA).
+  // High availability allows new AM to establish connection with already running containers
+  public static final String YARN_AM_HIGH_AVAILABILITY_ENABLED = "yarn.am.high-availability.enabled";
+  public static final boolean YARN_AM_HIGH_AVAILABILITY_ENABLED_DEFAULT = false;
+
+  // If AM-HA is enabled, when a running container loses heartbeat with AM,
+  // this count gives the number of times an already running container will attempt to establish heartbeat with new AM.
+  public static final String YARN_CONTAINER_HEARTBEAT_RETRY_COUNT = "yarn.container.heartbeat.retry.count";
+  public static final long YARN_CONTAINER_HEARTBEAT_RETRY_COUNT_DEFAULT = 5;
+
+  // If AM-HA is enabled, when a running container loses heartbeat with AM,
+  // this duration gives the amount of time a running container will sleep between attempts to establish heartbeat with new AM.
+  public static final String YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS = "yarn.container.heartbeat.retry-sleep-duration.ms";
+  public static final long YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS_DEFAULT = 10000;
+
   public JobConfig(Config config) {
     super(config);
   }
@@ -398,6 +413,19 @@
     return get(COORDINATOR_STREAM_FACTORY, DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY);
   }
 
+  public boolean getApplicationMasterHighAvailabilityEnabled() {
+    return getBoolean(YARN_AM_HIGH_AVAILABILITY_ENABLED, YARN_AM_HIGH_AVAILABILITY_ENABLED_DEFAULT);
+  }
+
+  public long getContainerHeartbeatRetryCount() {
+    return getLong(YARN_CONTAINER_HEARTBEAT_RETRY_COUNT, YARN_CONTAINER_HEARTBEAT_RETRY_COUNT_DEFAULT);
+  }
+
+  public long getContainerHeartbeatRetrySleepDurationMs() {
+    return getLong(YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS,
+        YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS_DEFAULT);
+  }
+
   /**
    * Get config loader factory according to the configs
    * @return full qualified name of {@link ConfigLoaderFactory}
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
index e6208ba..8a90549 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
@@ -25,6 +25,7 @@
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.stream.Collectors;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.util.HttpUtil;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
@@ -56,7 +57,7 @@
 
   public ContainerHeartbeatClient(String coordinatorUrl, String executionEnvContainerId) {
     this.heartbeatEndpoint =
-        String.format("%scontainerHeartbeat?executionContainerId=%s", coordinatorUrl, executionEnvContainerId);
+        String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT, coordinatorUrl, executionEnvContainerId);
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 89b5fc9..7e20f26 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -26,6 +26,10 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.util.ThreadUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,26 +38,47 @@
 public class ContainerHeartbeatMonitor {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
   private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
+  private static final CoordinatorStreamValueSerde SERDE = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+
   @VisibleForTesting
   static final int SCHEDULE_MS = 60000;
   @VisibleForTesting
   static final int SHUTDOWN_TIMOUT_MS = 120000;
 
   private final Runnable onContainerExpired;
-  private final ContainerHeartbeatClient containerHeartbeatClient;
   private final ScheduledExecutorService scheduler;
+  private final String containerExecutionId;
+  private final MetadataStore coordinatorStreamStore;
+  private final long sleepDurationForReconnectWithAM;
+  private final boolean isApplicationMasterHighAvailabilityEnabled;
+  private final long retryCount;
+
+  private ContainerHeartbeatClient containerHeartbeatClient;
+  private String coordinatorUrl;
   private boolean started = false;
 
-  public ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient) {
-    this(onContainerExpired, containerHeartbeatClient, Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY));
+  public ContainerHeartbeatMonitor(Runnable onContainerExpired, String coordinatorUrl, String containerExecutionId,
+      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled, long retryCount,
+      long sleepDurationForReconnectWithAM) {
+    this(onContainerExpired, new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId),
+        Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), coordinatorUrl, containerExecutionId,
+        coordinatorStreamStore, isApplicationMasterHighAvailabilityEnabled, retryCount, sleepDurationForReconnectWithAM);
   }
 
   @VisibleForTesting
   ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient,
-      ScheduledExecutorService scheduler) {
+      ScheduledExecutorService scheduler, String coordinatorUrl, String containerExecutionId,
+      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled,
+      long retryCount, long sleepDurationForReconnectWithAM) {
     this.onContainerExpired = onContainerExpired;
     this.containerHeartbeatClient = containerHeartbeatClient;
     this.scheduler = scheduler;
+    this.coordinatorUrl = coordinatorUrl;
+    this.containerExecutionId = containerExecutionId;
+    this.coordinatorStreamStore = coordinatorStreamStore;
+    this.isApplicationMasterHighAvailabilityEnabled = isApplicationMasterHighAvailabilityEnabled;
+    this.retryCount = retryCount;
+    this.sleepDurationForReconnectWithAM = sleepDurationForReconnectWithAM;
   }
 
   public void start() {
@@ -65,6 +90,12 @@
     scheduler.scheduleAtFixedRate(() -> {
       ContainerHeartbeatResponse response = containerHeartbeatClient.requestHeartbeat();
       if (!response.isAlive()) {
+        if (isApplicationMasterHighAvailabilityEnabled) {
+          LOG.warn("Failed to establish connection with {}. Checking for new AM", coordinatorUrl);
+          if (checkAndEstablishConnectionWithNewAM()) {
+            return;
+          }
+        }
         scheduler.schedule(() -> {
           // On timeout of container shutting down, force exit.
           LOG.error("Graceful shutdown timeout expired. Force exiting.");
@@ -84,6 +115,38 @@
     }
   }
 
+  private boolean checkAndEstablishConnectionWithNewAM() {
+    boolean response = false;
+    int attempt = 1;
+
+    while (attempt <= retryCount) {
+      String newCoordinatorUrl = SERDE.fromBytes(coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL));
+      try {
+        if (coordinatorUrl.equals(newCoordinatorUrl)) {
+          LOG.info("Attempt {} to discover new AM. Sleep for {}ms before next attempt.", attempt, sleepDurationForReconnectWithAM);
+          Thread.sleep(sleepDurationForReconnectWithAM);
+        } else {
+          LOG.info("Found new AM: {}. Establishing heartbeat with the new AM.", newCoordinatorUrl);
+          coordinatorUrl = newCoordinatorUrl;
+          containerHeartbeatClient = createContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
+          response = containerHeartbeatClient.requestHeartbeat().isAlive();
+          LOG.info("Received heartbeat response: {} from new AM: {}", response, this.coordinatorUrl);
+          break;
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted during sleep.");
+        Thread.currentThread().interrupt();
+      }
+      attempt++;
+    }
+    return response;
+  }
+
+  @VisibleForTesting
+  ContainerHeartbeatClient createContainerHeartbeatClient(String coordinatorUrl, String containerExecutionId) {
+    return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
+  }
+
   private static class HeartbeatThreadFactory implements ThreadFactory {
     private static final String PREFIX = "Samza-" + ContainerHeartbeatMonitor.class.getSimpleName() + "-";
     private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
diff --git a/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java b/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
new file mode 100644
index 0000000..557ef77
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.serializers.Serde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used for persisting and reading the execution environment container id information into the metadata store.
+ * Processor id (logical Samza processor id) to execution environment container id (ex: yarn container id) is written.
+ **/
+public class ExecutionContainerIdManager {
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionContainerIdManager.class);
+
+  private final Serde<String> valueSerde;
+  private final MetadataStore metadataStore;
+
+  /**
+   * Builds the ExecutionContainerIdManager based upon the provided {@link MetadataStore} that is instantiated.
+   * Uses the {@link CoordinatorStreamValueSerde} to serialize messages before reading/writing into metadata store.
+   * @param metadataStore an instance of {@link MetadataStore} to read/write the processor container id mapping.
+   */
+  public ExecutionContainerIdManager(MetadataStore metadataStore) {
+    this.metadataStore = metadataStore;
+    this.valueSerde = new CoordinatorStreamValueSerde(SetExecutionEnvContainerIdMapping.TYPE);
+  }
+
+  public void writeExecutionEnvironmentContainerIdMapping(String processorId, String executionEnvContainerId) {
+    Preconditions.checkNotNull(processorId, "Container's logical processor id can not be null.");
+    Preconditions.checkNotNull(executionEnvContainerId, "Container's physical execution environment container id can not be null.");
+    LOG.info("Processor {} has executionEnvContainerId as {}", processorId, executionEnvContainerId);
+    metadataStore.put(processorId, valueSerde.toBytes(executionEnvContainerId));
+    metadataStore.flush();
+  }
+
+  public Map<String, String> readExecutionEnvironmentContainerIdMapping() {
+    Map<String, String> executionEnvironmentContainerIdMapping = new HashMap<>();
+    metadataStore.all().forEach((processorId, valueBytes) -> {
+      if (valueBytes != null) {
+        String executionEnvContainerId = valueSerde.fromBytes(valueBytes);
+        executionEnvironmentContainerIdMapping.put(processorId, executionEnvContainerId);
+      }
+    });
+    if (LOG.isDebugEnabled()) {
+      for (Map.Entry<String, String> entry : executionEnvironmentContainerIdMapping.entrySet()) {
+        LOG.debug("Processor {} has executionEnvContainerId as {}", entry.getKey(), entry.getValue());
+      }
+    }
+    return executionEnvironmentContainerIdMapping;
+  }
+
+  public void close() {
+    metadataStore.close();
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
index d7a648b..22268a8 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
@@ -27,4 +27,13 @@
   public static final String APPLICATION_RUNNER_PATH_SUFFIX = "ApplicationRunnerData";
   public static final String RUNID_LOCK_ID = "runId";
   public static final int LOCK_TIMEOUT_MS = 300000;
+
+  // Yarn coordination constants for heartbeat
+  public static final String YARN_CONTAINER_HEARTBEAT_SERVELET = "containerHeartbeat";
+  public static final String YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID = "executionContainerId";
+  public static final String YARN_COORDINATOR_URL = "yarn.am.tracking.url";
+  private static final String YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT = "%s" + YARN_CONTAINER_HEARTBEAT_SERVELET;
+  private static final String YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT = YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID + "=" + "%s";
+  public static final String YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT = YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT + "?" +
+      YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT;
 }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
index 9b862bd..86983f1 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
@@ -23,6 +23,7 @@
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
@@ -51,7 +52,10 @@
   public String fromBytes(byte[] bytes) {
     Map<String, Object> values = messageSerde.fromBytes(bytes);
     CoordinatorStreamMessage message = new CoordinatorStreamMessage(new Object[]{}, values);
-    if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
+    if (type.equalsIgnoreCase(SetExecutionEnvContainerIdMapping.TYPE)) {
+      SetExecutionEnvContainerIdMapping executionContainerIdMapping = new SetExecutionEnvContainerIdMapping(message);
+      return executionContainerIdMapping.getExecutionEnvironmentContainerId();
+    } else if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping hostMapping = new SetContainerHostMapping(message);
       return hostMapping.getHostLocality();
     } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
@@ -76,7 +80,11 @@
 
   @Override
   public byte[] toBytes(String value) {
-    if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
+    if (type.equalsIgnoreCase(SetExecutionEnvContainerIdMapping.TYPE)) {
+      SetExecutionEnvContainerIdMapping
+          executionEnvContainerIdMapping = new SetExecutionEnvContainerIdMapping(SOURCE, "", value);
+      return messageSerde.toBytes(executionEnvContainerIdMapping.getMessageMap());
+    } else if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping hostMapping = new SetContainerHostMapping(SOURCE, "", value, "", "");
       return messageSerde.toBytes(hostMapping.getMessageMap());
     } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java
new file mode 100644
index 0000000..508b1df
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetExecutionEnvContainerIdMapping is used internally by the Samza framework to
+ * persist the processorId-to-executionEnvContainerId mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: $ProcessorId
+ *     Type: set-execution-env-container-id-mapping
+ *     Source: "SamzaContainer-$ProcessorId"
+ *     MessageMap:
+ *     {
+ *         execution-env-container-id: execution environment container id
+ *     }
+ * }
+ * */
+public class SetExecutionEnvContainerIdMapping extends CoordinatorStreamMessage {
+  public static final String TYPE = "set-execution-env-container-id-mapping";
+  public static final String EXEC_ENV_ID_KEY = "execution-env-container-id";
+
+  /**
+   * SetExecutionEnvContainerIdMapping is used to set the processor id to execution environment id mapping information.
+   * @param message which holds the processor id to execution environment id mapping information.
+   */
+  public SetExecutionEnvContainerIdMapping(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  /**
+   * SetExecutionEnvContainerIdMapping is used to set the processor id to execution environment id mapping information.
+   * @param source the source of the message
+   * @param key the key which is used to persist the message
+   * @param executionEnvContainerId the execution environment container id
+   */
+  public SetExecutionEnvContainerIdMapping(String source, String key, String executionEnvContainerId) {
+    super(source);
+    setType(TYPE);
+    setKey(key);
+    putMessageValue(EXEC_ENV_ID_KEY, executionEnvContainerId);
+  }
+
+  public String getExecutionEnvironmentContainerId() {
+    return getMessageValue(EXEC_ENV_ID_KEY);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index a5148fb..459ad89 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -30,8 +30,8 @@
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
+import org.apache.samza.container.ExecutionContainerIdManager;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
@@ -39,9 +39,12 @@
 import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
@@ -97,7 +100,7 @@
       String jobName,
       String jobId,
       String containerId,
-      Optional<String> execEnvContainerId,
+      Optional<String> execEnvContainerIdOptional,
       JobModel jobModel,
       Config config,
       Optional<ExternalContext> externalContextOptional) {
@@ -117,7 +120,7 @@
       Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);
 
       // Creating diagnostics manager and reporter, and wiring it respectively
-      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config);
+      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerIdOptional, config);
       Option<DiagnosticsManager> diagnosticsManager = Option.empty();
       if (diagnosticsManagerReporterPair.isPresent()) {
         diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
@@ -142,11 +145,24 @@
           listener = new ClusterBasedProcessorLifecycleListener(config, processorLifecycleListener, container::shutdown);
       container.setContainerListener(listener);
 
-      ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
+      JobConfig jobConfig = new JobConfig(config);
+      ContainerHeartbeatMonitor heartbeatMonitor =
+          createContainerHeartbeatMonitor(container,
+              new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE),
+              jobConfig.getApplicationMasterHighAvailabilityEnabled(), jobConfig.getContainerHeartbeatRetryCount(),
+              jobConfig.getContainerHeartbeatRetrySleepDurationMs());
       if (heartbeatMonitor != null) {
         heartbeatMonitor.start();
       }
 
+      if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
+        execEnvContainerIdOptional.ifPresent(execEnvContainerId -> {
+          ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
+              new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetExecutionEnvContainerIdMapping.TYPE));
+          executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(containerId, execEnvContainerId);
+        });
+      }
+
       container.run();
       if (heartbeatMonitor != null) {
         heartbeatMonitor.stop();
@@ -189,9 +205,15 @@
   /**
    * Creates a new container heartbeat monitor if possible.
    * @param container the container to monitor
+   * @param coordinatorStreamStore the metadata store to fetch coordinator url from
+   * @param isApplicaitonMasterHighAvailabilityEnabled whether AM HA is enabled to fetch new AM url
+   * @param retryCount number of times to retry connecting to new AM when heartbeat expires
+   * @param sleepDurationForReconnectWithAM sleep duration between retries to connect to new AM when heartbeat expires
    * @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
    */
-  private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container) {
+  private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container,
+      MetadataStore coordinatorStreamStore, boolean isApplicaitonMasterHighAvailabilityEnabled, long retryCount,
+      long sleepDurationForReconnectWithAM) {
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
     String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
     if (executionEnvContainerId != null) {
@@ -204,7 +226,8 @@
           log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
           System.exit(1);
         }
-      }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
+      }, coordinatorUrl, executionEnvContainerId, coordinatorStreamStore, isApplicaitonMasterHighAvailabilityEnabled,
+          retryCount, sleepDurationForReconnectWithAM);
     } else {
       log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
       return null;
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
index 7448e57..a65f6ff 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
@@ -18,17 +18,35 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.config.Config;
 
 import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.MapConfig;
+
 
 public class MockContainerAllocatorWithoutHostAffinity extends ContainerAllocator {
   public int requestedContainers = 0;
   private Semaphore semaphore = new Semaphore(0);
 
+  private Semaphore expiredRequestSemaphore = new Semaphore(0);
+  private AtomicInteger expiredRequestCallCount = new AtomicInteger(0);
+  private volatile boolean overrideIsRequestExpired = false;
+
+  // Create a MockContainerAllocator with certain config overrides
+  public static MockContainerAllocatorWithoutHostAffinity createContainerAllocatorWithConfigOverride(
+      ClusterResourceManager resourceManager, Config config, SamzaApplicationState state,
+      ContainerManager containerManager, Config overrideConfig) {
+    Map<String, String> mergedConfig = new HashMap<>();
+    mergedConfig.putAll(config);
+    mergedConfig.putAll(overrideConfig);
+    return new MockContainerAllocatorWithoutHostAffinity(resourceManager, new MapConfig(mergedConfig), state, containerManager);
+  }
+
   public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager resourceManager,
                                 Config config, SamzaApplicationState state, ContainerManager containerManager) {
     super(resourceManager, config, state, false, containerManager);
@@ -54,6 +72,29 @@
     super.requestResources(processorToHostMapping);
   }
 
+  public void setOverrideIsRequestExpired() {
+    overrideIsRequestExpired = true;
+  }
+
+  public int getExpiredRequestCallCount() {
+    return expiredRequestCallCount.get();
+  }
+
+  @Override
+  protected boolean isRequestExpired(SamzaResourceRequest request) {
+    if (!overrideIsRequestExpired) {
+      // if not set to override, then return the original result
+      return super.isRequestExpired(request);
+    }
+    expiredRequestSemaphore.release();
+    expiredRequestCallCount.incrementAndGet();
+    return true;
+  }
+
+  public boolean awaitIsRequestExpiredCall(long timeoutMs) throws InterruptedException {
+    return expiredRequestSemaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
+  }
+
   public ResourceRequestState getContainerRequestState() throws Exception {
     Field field = ContainerAllocator.class.getDeclaredField("resourceRequestState");
     field.setAccessible(true);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index b808296..ac5d6f3 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -165,6 +166,56 @@
   }
 
   /**
+   * See SAMZA-2601: we want to prevent an infinite loop in the case of expired request call with host affinity
+   * disabled. This test make sure we don't have that infinite loop.
+   */
+  @Test
+  public void testExpiredRequestInfiniteLoop() throws Exception {
+    Config override = new MapConfig(new HashMap<String, String>() {
+      {
+        // override to have a proper sleep interval for this test
+        put("cluster-manager.allocator.sleep.ms", "100");
+      }
+    });
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    containerAllocator =
+        MockContainerAllocatorWithoutHostAffinity.createContainerAllocatorWithConfigOverride(manager, config, state,
+            new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager),
+            override);
+    MockContainerAllocatorWithoutHostAffinity mockAllocator =
+        (MockContainerAllocatorWithoutHostAffinity) containerAllocator;
+    mockAllocator.setOverrideIsRequestExpired();
+    allocatorThread = new Thread(containerAllocator);
+
+    Map<String, String> containersToHostMapping = new HashMap<String, String>() {
+      {
+        put("0", null);
+        put("1", null);
+        put("2", null);
+        put("3", null);
+      }
+    };
+
+    allocatorThread.start();
+
+    mockAllocator.requestResources(containersToHostMapping);
+    // Wait for at least one expired request call is made, which should happen.
+    // If the test passes, this should return immediately (within 100 ms). Only when the test fails will it exhaust the
+    // timeout, which is worth the wait to find out the failure
+    assertTrue(mockAllocator.awaitIsRequestExpiredCall(TimeUnit.SECONDS.toMillis(10)));
+    // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+    // a Clock which can be simulated and controlled.
+    Thread.sleep(500);
+    // Given that we wait for 500 ms above, and a sleep interval of 100 ms, we should roughly see 5 times the
+    // isRequestExpired is called. We give some extra buffer here (<100). Because if we do run into infinite loop,
+    // isRequestExpired would be called MILLIONS of times (4~5 million times after a dozen of runs on my machine).
+    assertTrue(
+        String.format("Too many call count: %d. Seems to be in infinite loop", mockAllocator.getExpiredRequestCallCount()),
+        mockAllocator.getExpiredRequestCallCount() < 100);
+  }
+
+  /**
    * Test requestContainers with containerToHostMapping with host.affinity disabled
    */
   @Test
@@ -294,6 +345,8 @@
     spyThread = new Thread(spyAllocator, "Container Allocator Thread");
     // Start the container allocator thread periodic assignment
     spyThread.start();
+    // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+    // a Clock which can be simulated and controlled.
     Thread.sleep(1000);
     // Verify that all the request that were created were "ANY_HOST" requests
     ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
index 2445c00..ff297d1 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -22,6 +22,10 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -33,6 +37,7 @@
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -42,6 +47,8 @@
   private Runnable onExpired;
   @Mock
   private ContainerHeartbeatClient containerHeartbeatClient;
+  @Mock
+  private MetadataStore coordinatorStreamStore;
 
   private ScheduledExecutorService scheduler;
   /**
@@ -51,13 +58,18 @@
 
   private ContainerHeartbeatMonitor containerHeartbeatMonitor;
 
+  private static final String COORDINATOR_URL = "http://some-host.prod.linkedin.com";
+  private static final ContainerHeartbeatResponse FAILURE_RESPONSE = new ContainerHeartbeatResponse(false);
+  private static final ContainerHeartbeatResponse SUCCESS_RESPONSE = new ContainerHeartbeatResponse(true);
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
     this.schedulerFixedRateExecutionLatch = new CountDownLatch(1);
     this.scheduler = buildScheduledExecutorService(this.schedulerFixedRateExecutionLatch);
     this.containerHeartbeatMonitor =
-        new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler);
+        new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, false, 5, 10);
   }
 
   @Test
@@ -93,6 +105,75 @@
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE).thenReturn(SUCCESS_RESPONSE);
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should not have been submitted
+    verify(this.scheduler, never()).schedule(any(Runnable.class), anyLong(), any());
+    verify(this.onExpired, never()).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
+
+  @Test
+  public void testFailedToFetchNewAMCoordinatorUrl() throws InterruptedException {
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(COORDINATOR_URL));
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should have been submitted
+    verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
+
+  @Test
+  public void testConnectToNewAMFailed() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should have been submitted
+    verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
   /**
    * Build a mock {@link ScheduledExecutorService} which will execute a fixed-rate task once. It will count down on
    * {@code schedulerFixedRateExecutionLatch} when the task is finished executing.
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java b/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java
new file mode 100644
index 0000000..2ecd88c
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestExecutionContainerIdManager {
+
+  private static final Config
+      CONFIG = new MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", "test-kafka"));
+
+  private CoordinatorStreamStore coordinatorStreamStore;
+  private CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil;
+  private MetadataStore store;
+  private ExecutionContainerIdManager executionContainerIdManager;
+
+  @Before
+  public void setup() {
+    coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(CONFIG);
+    coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    store = Mockito.spy(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
+        SetExecutionEnvContainerIdMapping.TYPE));
+    executionContainerIdManager = new ExecutionContainerIdManager(store);
+
+  }
+
+  @After
+  public void tearDown() {
+    MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
+  }
+
+  @Test
+  public void testExecutionContainerIdManager() {
+    String physicalId = "container_123_123_123";
+    String processorId = "0";
+
+    executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId);
+    Map<String, String> localMap = executionContainerIdManager.readExecutionEnvironmentContainerIdMapping();
+
+    Map<String, String> expectedMap = ImmutableMap.of(processorId, physicalId);
+    assertEquals(expectedMap, localMap);
+
+    executionContainerIdManager.close();
+
+    MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer producer = coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemProducer();
+    MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer consumer = coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemConsumer();
+    assertTrue(producer.isStopped());
+    assertTrue(consumer.isStopped());
+
+    ArgumentCaptor<byte[]> argument1 = ArgumentCaptor.forClass(byte[].class);
+    Mockito.verify(store).put(Mockito.eq(processorId), argument1.capture());
+    CoordinatorStreamValueSerde valueSerde = new CoordinatorStreamValueSerde(SetExecutionEnvContainerIdMapping.TYPE);
+    assertEquals(physicalId, valueSerde.fromBytes(argument1.getValue()));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testInvalidKeyExecutionContainerIdManager() {
+    String physicalId = "container_123_123_123";
+    String processorId = null;
+    executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId);
+  }
+  @Test(expected = NullPointerException.class)
+  public void testInvalidValueExecutionContainerIdManager() {
+    String physicalId = null;
+    String processorId = "0";
+    executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId);
+  }
+}
+
diff --git a/samza-test/src/main/config/join/README b/samza-test/src/main/config/join/README
index bf9a385..1bb220e 100644
--- a/samza-test/src/main/config/join/README
+++ b/samza-test/src/main/config/join/README
@@ -44,17 +44,17 @@
 > cd $HELLO_SAMZA_SRC
 > for i in zookeeper kafka yarn; do ./bin/grid install $i; ./bin/grid start $i; done
 
-Update the "yarn.package.path" to $DEPLOY_DIR/samza-test_2.11-1.5.0-SNAPSHOT.tgz
+Update the "yarn.package.path" to $DEPLOY_DIR/samza-test_2.11-1.6.0-SNAPSHOT.tgz
 > cd $SAMZA_SRC
 > vi samza-test/src/main/config/join/common.properties
-yarn.package.path=file:///path/to/samza-hello-samza/deploy/samza-test_2.11-1.5.0-SNAPSHOT.tgz
+yarn.package.path=file:///path/to/samza-hello-samza/deploy/samza-test_2.11-1.6.0-SNAPSHOT.tgz
 
 Then release and extract the test tarball:
 > cd $SAMZA_SRC
 > ./gradlew releaseTestJobs
-> cp samza-test/build/distributions/samza-test_2.11-1.5.0-SNAPSHOT.tgz $DEPLOY_DIR
+> cp samza-test/build/distributions/samza-test_2.11-1.6.0-SNAPSHOT.tgz $DEPLOY_DIR
 > mkdir $DEPLOY_DIR/samza
-> tar -xvf $DEPLOY_DIR/samza-test_2.11-1.5.0-SNAPSHOT.tgz -C $DEPLOY_DIR/samza
+> tar -xvf $DEPLOY_DIR/samza-test_2.11-1.6.0-SNAPSHOT.tgz -C $DEPLOY_DIR/samza
 
 Finally, create the kafka topics and start the samza jobs:
 > ./bin/setup-int-test.sh $DEPLOY_DIR
diff --git a/samza-test/src/main/python/configs/tests.json b/samza-test/src/main/python/configs/tests.json
index 1e54929..7f5d5da 100644
--- a/samza-test/src/main/python/configs/tests.json
+++ b/samza-test/src/main/python/configs/tests.json
@@ -1,5 +1,5 @@
 {
-  "samza_executable": "samza-test_2.11-1.5.0-SNAPSHOT.tgz",
+  "samza_executable": "samza-test_2.11-1.6.0-SNAPSHOT.tgz",
   "samza_install_path": "deploy/smoke_tests",
   "samza_config_loader_factory": "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory"
 }
diff --git a/samza-test/src/main/python/stream_processor.py b/samza-test/src/main/python/stream_processor.py
index dabf8e4..f42590b 100644
--- a/samza-test/src/main/python/stream_processor.py
+++ b/samza-test/src/main/python/stream_processor.py
@@ -43,7 +43,7 @@
         logger.info('Running processor start command: {0}'.format(self.processor_start_command))
         self.deployment_config = {
             'install_path': os.path.join(runtime.get_active_config('remote_install_path'), 'deploy/{0}'.format(self.processor_id)),
-            'executable': 'samza-test_2.11-1.5.0-SNAPSHOT.tgz',
+            'executable': 'samza-test_2.11-1.6.0-SNAPSHOT.tgz',
             'post_install_cmds': [],
             'start_command': self.processor_start_command,
             'stop_command': '',
diff --git a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
index 3ed3928..b683d35 100644
--- a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
+++ b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
@@ -27,6 +27,7 @@
 import javax.servlet.http.HttpServletResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.job.yarn.SamzaAppMasterMetrics;
 import org.apache.samza.job.yarn.YarnAppState;
 import org.apache.samza.job.yarn.YarnContainer;
@@ -47,7 +48,6 @@
  */
 public class YarnContainerHeartbeatServlet extends HttpServlet {
 
-  private static final String YARN_CONTAINER_ID = "executionContainerId";
   private static final Logger LOG = LoggerFactory.getLogger(YarnContainerHeartbeatServlet.class);
   private static final String APPLICATION_JSON = "application/json";
   private static final String GROUP = SamzaAppMasterMetrics.class.getName();
@@ -67,7 +67,7 @@
       throws ServletException, IOException {
     ContainerId yarnContainerId;
     PrintWriter printWriter = resp.getWriter();
-    String containerIdParam = req.getParameter(YARN_CONTAINER_ID);
+    String containerIdParam = req.getParameter(CoordinationConstants.YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID);
     ContainerHeartbeatResponse response;
     resp.setContentType(APPLICATION_JSON);
     boolean alive = false;
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
index f436f79..0f512ad 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
@@ -23,6 +23,7 @@
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.samza.clustermanager.SamzaApplicationState
 import org.apache.samza.config.Config
+import org.apache.samza.coordinator.CoordinationConstants
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
 import org.apache.samza.coordinator.stream.messages.SetConfig
@@ -40,7 +41,7 @@
 class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry, yarnConfiguration: YarnConfiguration) extends  Logging {
   var rpcApp: HttpServer = null
   var webApp: HttpServer = null
-  val SERVER_URL_OPT: String = "samza.autoscaling.server.url"
+  val SERVER_URL_OPT: String = CoordinationConstants.YARN_COORDINATOR_URL;
   var securityManager: Option[SamzaAppMasterSecurityManager] = None
 
   def onInit() {
@@ -56,7 +57,8 @@
     webApp.addServlet("/*", new ApplicationMasterWebServlet(config, samzaAppState, state))
     webApp.start
 
-    samzaAppState.jobModelManager.server.addServlet("/containerHeartbeat", new YarnContainerHeartbeatServlet(state, registry))
+    samzaAppState.jobModelManager.server.addServlet("/" + CoordinationConstants.YARN_CONTAINER_HEARTBEAT_SERVELET,
+      new YarnContainerHeartbeatServlet(state, registry))
     samzaAppState.jobModelManager.start
     state.rpcUrl = rpcApp.getUrl
     state.trackingUrl = webApp.getUrl
diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
index 8987834..0901d85 100644
--- a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
@@ -24,6 +24,7 @@
 import junit.framework.Assert;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.job.yarn.YarnAppState;
 import org.apache.samza.job.yarn.YarnContainer;
@@ -76,7 +77,7 @@
     String validContainerId = "container_1350670447861_0003_01_000002";
     when(container.id()).thenReturn(ConverterUtils.toContainerId(validContainerId));
     yarnAppState.runningProcessors.put(validContainerId, container);
-    URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + validContainerId);
+    URL url = new URL(String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT, webApp.getUrl().toString(), validContainerId));
     String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
     heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
     Assert.assertTrue(heartbeat.isAlive());
@@ -89,7 +90,8 @@
     String invalidContainerId = "container_1350670447861_0003_01_000002";
     when(container.id()).thenReturn(ConverterUtils.toContainerId(validContainerId));
     yarnAppState.runningProcessors.put(validContainerId, container);
-    URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + invalidContainerId);
+    URL url = new URL(String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT,
+        webApp.getUrl().toString(), invalidContainerId));
     String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
     heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
     Assert.assertFalse(heartbeat.isAlive());