Merge branch '1.6.0' of https://github.com/apache/samza into 1.6.0
diff --git a/KEYS b/KEYS
index 3cc4efc..ee681c1 100644
--- a/KEYS
+++ b/KEYS
@@ -6,11 +6,14 @@
   Create a key:
     gpg --gen-key
 
+  Private keys is stored in ~/.gnupg
+
   Adding your key to this file:
     (gpg --list-sigs <key id> && gpg --armor --export <key id>) >> this file.
 
   Publish the key:
     gpg --keyserver pgp.mit.edu --send-keys <key id>
+    gpg --keyserver keyserver.ubuntu.com --send-keys <key id>
 
   Signing another developers key:
     gpg --keyserver pgp.mit.edu --search-keys <name or email>
@@ -18,6 +21,13 @@
     gpg --sign-key <key id>
     gpg --keyserver pgp.mit.edu --send-keys <key id>
 
+  Siggning release with the key *may* require the following some of these steps:
+  git config --global gpg.program gpg2
+  git config --global user.signingkey D2103453 // where D2103453 is your key
+
+  export GPG_TTY=$(tty) // to allow passphrase entry
+  
+
   Additional Information:
     http://www.apache.org/dev/openpgp.html#generate-key
 
@@ -918,3 +928,40 @@
 VcSdqZkA/iKbC7Fv6xS7fTmhnLmAJ40h
 =Ja6V
 -----END PGP PUBLIC KEY BLOCK-----
+pub   2048R/D2103453 2020-11-19 [expires: 2023-11-19]
+uid                  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+sig 3        D2103453 2020-11-19  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+sub   2048R/4C01BF41 2020-11-19 [expires: 2023-11-19]
+sig          D2103453 2020-11-19  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v2.0.22 (GNU/Linux)
+
+mQENBF+2rJsBCACpNvGmT730xGXb0/lELUhzBu10BhdXfkr6FupYU6IxNMSZc/W4
+LKjB0ptF/OfodjdOhA2Gbv0kZt4mRBgmwe+vIJkFAXOZccq+G708ZPBsjifUDESN
+oG9/juCJSSrYGcclWiPg0Fe3bBWm5KPWSkYG1OnHTWnqT7a5n6comgZmNori5tmA
+aPN/RnEywEY7vnqyvcJnfPKysvvaoTUbfoUaiGIVGwprYnQ3huvq+o81etWAOVvZ
+qry+6m6fA5l35bmr29xlbiK7lYCShie7u03U9eRiaLGevFSziViq/cp7OlcAXVN4
+hGtuXh+NVMgaxsy7HIs4H+x9oc9IO+WpdNS5ABEBAAG0M0JvcmlzIFNoa29sbmlr
+IChMaW5rZWRpbiBTYW16YSkgPGJvcnlhc0BhcGFjaGUub3JnPokBPwQTAQIAKQUC
+X7asmwIbAwUJBaOagAcLCQgHAwIBBhUIAgkKCwQWAgMBAh4BAheAAAoJED6OsXvS
+EDRTz4IH/iWHlHFlvasnLCgVU+29NMrx+jFRXMG5pDzd/bNocrlD+E42ocpOU1vo
+bGwvey0I4Hi8DCGKR2p14IAs2tzaKYZR0kel1QesciI+ngyE/zWEI3xKv8OXiS8A
+XdQC1Eq8Q+BJhun3A8zuQU095r9/sygTFBWlOtQ1Wn2qJ5oC6005qDDbyDAbEsxp
+4m+VaM7faHGjpNOSekHCJW2wue55I+IEF/eH8BsRbw6DGlIFhspsl17wJQAcnmRu
+wakPKZGtEyjgwVYdmUpne7GW87n18JdT/6ywfTqw17/ly0zBJhl2M0JTq0d0+Z9P
+NFwt1pWDDJOtmMClMrj3PcAk5DVVPFG5AQ0EX7asmwEIALMe7kvm6j5bgrj2fZVg
+663bqvTSdn90WQ+lQgpURYXMz4VAFNKlaqDGOX2L94vdI4pIEvPfedQ0C9wZQFz3
+NujaRL4gsxx9CHolKzTs68DE7pyiikzpZFuSIKfv0bukSmzqgvz83VKH33jAZiqd
+HFzRh6ad0iMhDpF4g//E6SLnqJdrf335JAB1P/v9LH579XWfbGWegIkXOwG8QTPB
+LSslAKah8JFBILQfeL4MmbJnwb/BQqLIWVyK5hjzon+mxKEWHQNLaxM4R517JCFQ
+DNzBYyoqO517XXEbCdUQ/ZXCRH2+gCFEWj9kA2b6Frqf4B1PD8BUWBjBYDv1RNNd
+kfsAEQEAAYkBJQQYAQIADwUCX7asmwIbDAUJBaOagAAKCRA+jrF70hA0U/trB/9G
+gqyT6JSlmBY+8ep9NW+bDaoWQZY0f4kAR2JyehUM8kLrLqB7D3moah7XrR6nOoWt
+e56yp81+3hGLTcHK2WKapTgYssPrm/nv7NySMUw04UvRMrn+wV5U+7MUtdBxO23R
+hSRuqb40wPBzo97Lb7LOEqLF0OHNMiFSV1qLuhyDGg87zEkfJM+o52Y/CT9nh60Z
+5OXa1fCQhpPC29d7OsBJklE3EtwokCyeeqxJFpcDBNTqM+5GLWbRZp6YQfnNdoPB
+XgSlVbifAXWaMZr8kFpNF6H+zm6tZ3rclihqgWhUqrAAmT11qtMacbK8dRVw3im5
+oZPFXmvDQy/353sy1WRs
+=MLmR
+-----END PGP PUBLIC KEY BLOCK-----
diff --git a/RELEASE.md b/RELEASE.md
index 9b34f78..cd7fe90 100644
--- a/RELEASE.md
+++ b/RELEASE.md
@@ -53,7 +53,7 @@
     ./gradlew clean publishToMavenLocal
 ```
 
-To build a tarball suitable for an ASF source release (and its accompanying MD5 file):
+To build a tarball suitable for an ASF source release (and its accompanying md5 file):
 
 First, clean any non-checked-in files from git (this removes all such files without prompting):
 
@@ -179,7 +179,7 @@
    cd samza-dist
    mkdir $VERSION
    cp ${SAMZA_SRC_ROOT}/build/distribution/source/apache-samza-$VERSION-src.tgz $VERSION
-   cp ${SAMZA_SRC_ROOT}/build/distribution/source/apache-samza-$VERSION-src.tgz.MD5 $VERSION
+   cp ${SAMZA_SRC_ROOT}/build/distribution/source/apache-samza-$VERSION-src.tgz.md5 $VERSION
    cp ${SAMZA_SRC_ROOT}/build/distribution/source/apache-samza-$VERSION-src.tgz.asc $VERSION
    svn add $VERSION
    ```
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 932c4c8..96e994a 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -324,6 +324,9 @@
 |job.container.count|1|The number of YARN containers to request for running your job. This is the main parameter for controlling the scale (allocated computing resources) of your job: to increase the parallelism of processing, you need to increase the number of containers. The minimum is one container, and the maximum number of containers is the number of task instances (usually the number of input stream partitions). Task instances are evenly distributed across the number of containers that you specify.|
 |cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to request from the cluster manager per container of your job. Along with cluster-manager.container.cpu.cores, this property determines how many containers the cluster manager will run on one machine. If the container exceeds this limit, it will be killed, so it is important that the container's actual memory use remains below the limit. The amount of memory used is normally the JVM heap size (configured with task.opts), plus the size of any off-heap memory allocation (for example stores.*.container.cache.size.bytes), plus a safety margin to allow for JVM overheads.|
 |cluster-manager.container.cpu.cores|1|The number of CPU cores to request per container of your job. Each node in the cluster has a certain number of CPU cores available, so this number (along with cluster-manager.container.memory.mb) determines how many containers can be run on one machine.|
+|yarn.am.high-availability.enabled|false|If true, enables Job Coordinator (AM) high availability (HA) where a new AM can establish connection with already running containers.  
+|yarn.container.heartbeat.retry.count|5|If AM-HA is enabled, when a running container loses heartbeat with AM, this count gives the number of times an already running container will attempt to establish heartbeat with new AM|
+|yarn.container.heartbeat.retry-sleep-duration.ms|10000|If AM-HA is enabled, when a running container loses heartbeat with AM, this duration gives the amount of time a running container will sleep between attempts to establish heartbeat with new AM.|
 
 ##### <a name="advanced-cluster-configurations"></a>[5.1.1 Advanced Cluster Configurations](#advanced-cluster-configurations)
 |Name|Default|Description|
@@ -335,6 +338,8 @@
 |cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor of `job.jmx.enabled`|
 |cluster-manager.allocator.sleep.ms|3600|The container allocator thread is responsible for matching requests to allocated containers. The sleep interval for this thread is configured using this property.|
 |cluster-manager.container.request.timeout.ms|5000|The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource. This property determines the number of milliseconds before a container request is considered to have expired / timed-out. When a request expires, it gets allocated to any available container that was returned by the cluster manager.|
+|cluster-manager.fault-domain-aware.standby.enabled|false|This property when set to true, makes standby container allocation fault domain aware. Along with this, you will also need to configure `cluster-manager.fault-domain-manager.factory`.|
+|cluster-manager.fault-domain-manager.factory|`org.apache.samza.`<br>`job.yarn.`<br>`YarnFaultDomainManagerFactory`|This is the fully qualified name of the Java class that determines which factory to use based on the cluster manager, while making standby container allocations fault domain aware. This configuration applies only when `cluster-manager.fault-domain-aware.standby.enabled` is set to true.|
 |task.execute|bin/run-container.sh|The command that starts a Samza container. The script must be included in the [job package](./packaging.html). There is usually no need to customize this.|
 |task.java.home| |The JAVA_HOME path for Samza containers. By setting this property, you can use a java version that is different from your cluster's java version. Remember to set the `yarn.am.java.home` as well.|
 |yarn.am.container.<br>memory.mb|1024|Each Samza job when running in Yarn has one special container, the [ApplicationMaster](../yarn/application-master.html) (AM), which manages the execution of the job. This property determines how much memory, in megabytes, to request from YARN for running the ApplicationMaster.|
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 28acb2a..60e7766 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -44,7 +44,7 @@
   metricsVersion = "2.2.0"
   mockitoVersion = "1.10.19"
   powerMockVersion = "1.6.6"
-  rocksdbVersion = "6.5.3"
+  rocksdbVersion = "6.6.4"
   scalaTestVersion = "3.0.1"
   slf4jVersion = "1.7.7"
   yarnVersion = "2.7.1"
diff --git a/samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java b/samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java
new file mode 100644
index 0000000..464941d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job;
+
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * A data model to represent the metadata of the job coordinator. The metadata refers to attributes of job coordinator
+ * scoped to attempt within a deployment. For the purpose of this data model, deployment and attempt are defined
+ * as follows
+ *
+ * Deployment - Set of actions to stop an existing application, install new binaries and submit a request to run the new binaries
+ * Attempt    - Incarnations of application within a deployment for fault tolerance; e.g. Job coordinator failures or
+ *              job model changes detected by partition count monitoring or regex monitor.
+ *
+ * Metadata generation may require underlying cluster manager's interaction. The following describes the properties
+ * of the attributes to provide guidelines for implementors of contracts related to metadata generation.
+ *
+ * Epoch ID - An identifier to associated with the job coordinator's lifecycle within the scope of a single deployment.
+ * The properties of the epoch identifier are as follows
+ *    1. Unique across applications in the cluster
+ *    2. Remains unchanged within a single deployment lifecycle
+ *    3. Remains unchanged across application attempt within a single deployment lifecycle
+ *    4. Changes across deployment lifecycle
+ *
+ * Config ID - An identifier associated with a subset of configuration snapshot used by the job in an application attempt.
+ * Current prefixes that impacts the identifier are job.autosizing.*
+ * The properties of the config identifier are as follows
+ *    1. Unique and Reproducible
+ *    2. Remains unchanged across application attempts / deployments as long as the subset of configuration remains unchanged.
+ *
+ * Job Model ID - An identifier associated with the JobModel used by the job in an application attempt. JobModel
+ * has both configurations and list of container model. We don't account for changes in the configuration as part of this
+ * identifier since it is separately tracked and handled by Config ID.
+ * The properties of the job model identifier are as follows
+ *    1. Unique and Reproducible
+ *    2. Remains unchanged across application attempts / deployments as long as the work assignment remains unchanged
+ *
+ * Notes on interface stability - It is used internally by Samza for job coordinator high availability in YARN
+ * deployment offering. It may evolve depending on expanding the scope beyond YARN and hence unstable.
+ *
+ */
+@InterfaceStability.Unstable
+public class JobCoordinatorMetadata {
+  private final String configId;
+  private final String epochId;
+  private final String jobModelId;
+
+  public JobCoordinatorMetadata(String epochId, String configId, String jobModelId) {
+    Preconditions.checkState(StringUtils.isNotBlank(epochId), "Epoch ID cannot be empty");
+    Preconditions.checkState(StringUtils.isNotBlank(configId), "Config ID cannot be empty");
+    Preconditions.checkState(StringUtils.isNotBlank(jobModelId), "Job Model ID cannot be empty");
+    this.configId = configId;
+    this.epochId = epochId;
+    this.jobModelId = jobModelId;
+  }
+
+  public String getConfigId() {
+    return configId;
+  }
+
+  public String getEpochId() {
+    return this.epochId;
+  }
+
+  public String getJobModelId() {
+    return jobModelId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof JobCoordinatorMetadata)) {
+      return false;
+    }
+    JobCoordinatorMetadata metadata = (JobCoordinatorMetadata) o;
+    return Objects.equals(configId, metadata.configId) && Objects.equals(epochId, metadata.epochId)
+        && Objects.equals(jobModelId, metadata.jobModelId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(configId, epochId, jobModelId);
+  }
+
+  @Override
+  public String toString() {
+    return "JobCoordinatorMetadata{" + "configId='" + configId + '\'' + ", epochId='" + epochId + '\''
+        + ", jobModelId='" + jobModelId + '\'' + '}';
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 63ee3c7..08bcfda 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -34,9 +34,11 @@
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.ExecutionContainerIdManager;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.InputStreamsDiscoveredException;
+import org.apache.samza.coordinator.JobCoordinatorMetadataManager;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.MetadataResourceUtil;
 import org.apache.samza.coordinator.PartitionChangeException;
@@ -45,6 +47,9 @@
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.job.JobCoordinatorMetadata;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.JobModelUtil;
@@ -62,7 +67,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Implements a JobCoordinator that is completely independent of the underlying cluster
  * manager system. This {@link ClusterBasedJobCoordinator} handles functionality common
@@ -168,6 +172,11 @@
    */
   private JmxServer jmxServer;
 
+  /*
+   * Denotes if the metadata changed across application attempts. Used only if job coordinator high availability is enabled
+   */
+  private boolean metadataChangedAcrossAttempts = false;
+
   /**
    * Variable to keep the callback exception
    */
@@ -206,6 +215,12 @@
     this.localityManager =
         new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
 
+    if (isApplicationMasterHighAvailabilityEnabled()) {
+      ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
+          new NamespaceAwareCoordinatorStreamStore(metadataStore, SetExecutionEnvContainerIdMapping.TYPE));
+      state.processorToExecutionId.putAll(executionContainerIdManager.readExecutionEnvironmentContainerIdMapping());
+      generateAndUpdateJobCoordinatorMetadata(jobModelManager.jobModel());
+    }
     // build metastore for container placement messages
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(metadataStore);
 
@@ -252,8 +267,12 @@
       MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
       metadataResourceUtil.createResources();
 
-      // fan out the startpoints if startpoints is enabled
-      if (new JobConfig(config).getStartpointEnabled()) {
+      /*
+       * We fanout startpoint if and only if
+       *  1. Startpoint is enabled in configuration
+       *  2. If AM HA is enabled, fanout only if startpoint enabled and job coordinator metadata changed
+       */
+      if (shouldFanoutStartpoint()) {
         StartpointManager startpointManager = createStartpointManager();
         startpointManager.start();
         try {
@@ -324,6 +343,24 @@
   }
 
   /**
+   * Generate the job coordinator metadata for current application attempt and checks for changes in the
+   * metadata from the previous attempt and writes the updates metadata to coordinator stream.
+   *
+   * @param jobModel job model used to generate the job coordinator metadata
+   */
+  @VisibleForTesting
+  void generateAndUpdateJobCoordinatorMetadata(JobModel jobModel) {
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = createJobCoordinatorMetadataManager();
+
+    JobCoordinatorMetadata previousMetadata = jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    JobCoordinatorMetadata newMetadata = jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, config);
+    if (jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata)) {
+      jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+      metadataChangedAcrossAttempts = true;
+    }
+  }
+
+  /**
    * Stops all components of the JobCoordinator.
    */
   private void onShutDown() {
@@ -448,6 +485,39 @@
 
   @VisibleForTesting
   ContainerProcessManager createContainerProcessManager() {
-    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager);
+    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager,
+        metadataChangedAcrossAttempts);
+  }
+
+  @VisibleForTesting
+  JobCoordinatorMetadataManager createJobCoordinatorMetadataManager() {
+    return new JobCoordinatorMetadataManager(new NamespaceAwareCoordinatorStreamStore(metadataStore,
+        SetJobCoordinatorMetadataMessage.TYPE), JobCoordinatorMetadataManager.ClusterType.YARN, metrics);
+  }
+
+  @VisibleForTesting
+  boolean isApplicationMasterHighAvailabilityEnabled() {
+    return new JobConfig(config).getApplicationMasterHighAvailabilityEnabled();
+  }
+
+  @VisibleForTesting
+  boolean isMetadataChangedAcrossAttempts() {
+    return metadataChangedAcrossAttempts;
+  }
+
+  /**
+   * We only fanout startpoint if and only if
+   *  1. Startpoint is enabled
+   *  2. If AM HA is enabled, fanout only if startpoint enabled and job coordinator metadata changed
+   *
+   * @return true if it satisfies above conditions, false otherwise
+   */
+  @VisibleForTesting
+  boolean shouldFanoutStartpoint() {
+    JobConfig jobConfig = new JobConfig(config);
+    boolean startpointEnabled = jobConfig.getStartpointEnabled();
+
+    return isApplicationMasterHighAvailabilityEnabled() ?
+        startpointEnabled && isMetadataChangedAcrossAttempts() : startpointEnabled;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 5de9950..88be21f 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -22,6 +22,7 @@
 import java.time.Instant;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
@@ -234,6 +235,13 @@
         if (isRequestExpired(request)) {
           updateExpiryMetrics(request);
           containerManager.handleExpiredRequest(processorId, preferredHost, request, this, resourceRequestState);
+          // SAMZA-2601: to prevent infinite looping and logs filling up the disk, when host affinity is disabled,
+          // we explicitly break the loop here and the whole process gets retried in run() after allocatorSleepIntervalMs
+          if (!hostAffinityEnabled) {
+            LOG.info("Waiting for resources to get allocated for request {},"
+                + " no retries will be issued since host affinity is disabled", request);
+            break;
+          }
         } else {
           LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
                   + "Request creation time: {}. Current Time: {}. Request timeout: {} ms", processorId, preferredHost,
@@ -341,6 +349,16 @@
   }
 
   /**
+   * Requests a resource from the cluster manager
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param faultDomains set of fault domains on which to schedule this resource
+   */
+  public final void requestResource(String processorId, String preferredHost, Set<FaultDomain> faultDomains) {
+    requestResourceWithDelay(processorId, preferredHost, Duration.ZERO, faultDomains);
+  }
+
+  /**
    * Requests a resource from the cluster manager with a request timestamp of the current time plus the specified delay.
    * @param processorId Samza processor ID that will be run when a resource is allocated for this request
    * @param preferredHost name of the host that you prefer to run the processor on
@@ -352,6 +370,18 @@
   }
 
   /**
+   * Requests a resource from the cluster manager with a request timestamp of the current time plus the specified delay.
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param delay the {@link Duration} to add to the request timestamp
+   * @param faultDomains set of fault domains on which to schedule this resource
+   */
+  public final void requestResourceWithDelay(String processorId, String preferredHost, Duration delay, Set<FaultDomain> faultDomains) {
+    SamzaResourceRequest request = getResourceRequestWithDelay(processorId, preferredHost, delay, faultDomains);
+    issueResourceRequest(request);
+  }
+
+  /**
    * Creates a {@link SamzaResourceRequest} to send to the cluster manager
    * @param processorId Samza processor ID that will be run when a resource is allocated for this request
    * @param preferredHost name of the host that you prefer to run the processor on
@@ -362,6 +392,17 @@
   }
 
   /**
+   * Creates a {@link SamzaResourceRequest} to send to the cluster manager
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param faultDomains set of fault domains on which to schedule this resource
+   * @return the created request
+   */
+  public final SamzaResourceRequest getResourceRequest(String processorId, String preferredHost, Set<FaultDomain> faultDomains) {
+    return getResourceRequestWithDelay(processorId, preferredHost, Duration.ZERO, faultDomains);
+  }
+
+  /**
    * Creates a {@link SamzaResourceRequest} to send to the cluster manager with a request timestamp of the current time
    * plus the specified delay.
    * @param processorId Samza processor ID that will be run when a resource is allocated for this request
@@ -373,6 +414,19 @@
     return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay));
   }
 
+  /**
+   * Creates a {@link SamzaResourceRequest} to send to the cluster manager with a request timestamp of the current time
+   * plus the specified delay.
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param delay the {@link Duration} to add to the request timestamp
+   * @param faultDomains set of fault domains on which to schedule this resource
+   * @return the created request
+   */
+  public final SamzaResourceRequest getResourceRequestWithDelay(String processorId, String preferredHost, Duration delay, Set<FaultDomain> faultDomains) {
+    return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay), faultDomains);
+  }
+
   public final void issueResourceRequest(SamzaResourceRequest request) {
     resourceRequestState.addResourceRequest(request);
     state.containerRequests.incrementAndGet();
@@ -381,6 +435,9 @@
     } else {
       state.preferredHostRequests.incrementAndGet();
     }
+    if (!request.getFaultDomains().isEmpty()) {
+      state.faultDomainAwareContainerRequests.incrementAndGet();
+    }
   }
 
   /**
@@ -447,7 +504,7 @@
    * @param request the request to check
    * @return true if request has expired
    */
-  private boolean isRequestExpired(SamzaResourceRequest request) {
+  protected boolean isRequestExpired(SamzaResourceRequest request) {
     long currTime = Instant.now().toEpochMilli();
     boolean requestExpired =  currTime - request.getRequestTimestamp().toEpochMilli() > getRequestTimeout(request).toMillis();
     if (requestExpired) {
@@ -473,5 +530,8 @@
     } else {
       state.expiredPreferredHostRequests.incrementAndGet();
     }
+    if (!request.getFaultDomains().isEmpty()) {
+      state.expiredFaultDomainAwareContainerRequests.incrementAndGet();
+    }
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 24130fd..5fcf328 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -28,6 +28,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
+import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.placement.ContainerPlacementMessage;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
@@ -88,8 +89,9 @@
 
   public ContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
       SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
+      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager, FaultDomainManager faultDomainManager, Config config) {
     Preconditions.checkNotNull(localityManager, "Locality manager cannot be null");
+    Preconditions.checkNotNull(faultDomainManager, "Fault domain manager cannot be null");
     this.samzaApplicationState = samzaApplicationState;
     this.clusterResourceManager = clusterResourceManager;
     this.actions = new ConcurrentHashMap<>();
@@ -100,7 +102,7 @@
     // Enable standby container manager if required
     if (standByEnabled) {
       this.standbyContainerManager =
-          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager, localityManager));
+          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager, localityManager, config, faultDomainManager));
     } else {
       this.standbyContainerManager = Optional.empty();
     }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 1bc1669..143e0b3 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -131,12 +131,14 @@
    */
   private final Map<String, ProcessorFailure> processorFailures = new HashMap<>();
 
+  private final boolean restartContainers;
+
   private ContainerProcessManagerMetrics containerProcessManagerMetrics;
   private JvmMetrics jvmMetrics;
   private Map<String, MetricsReporter> metricsReporters;
 
   public ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry,
-      ContainerPlacementMetadataStore metadataStore, LocalityManager localityManager) {
+      ContainerPlacementMetadataStore metadataStore, LocalityManager localityManager, boolean restartContainers) {
     Preconditions.checkNotNull(localityManager, "Locality manager cannot be null");
     this.state = state;
     this.clusterManagerConfig = new ClusterManagerConfig(config);
@@ -147,6 +149,9 @@
     ResourceManagerFactory factory = getContainerProcessManagerFactory(clusterManagerConfig);
     this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state));
 
+    FaultDomainManagerFactory faultDomainManagerFactory = getFaultDomainManagerFactory(clusterManagerConfig);
+    FaultDomainManager faultDomainManager = checkNotNull(faultDomainManagerFactory.getFaultDomainManager(config, registry));
+
     // Initialize metrics
     this.containerProcessManagerMetrics = new ContainerProcessManagerMetrics(config, state, registry);
     this.jvmMetrics = new JvmMetrics(registry);
@@ -170,11 +175,12 @@
     // Wire all metrics to all reporters
     this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry));
 
-    this.containerManager = new ContainerManager(metadataStore, state, clusterResourceManager, hostAffinityEnabled,
-        jobConfig.getStandbyTasksEnabled(), localityManager);
+    this.containerManager = new ContainerManager(metadataStore, state, clusterResourceManager,
+            hostAffinityEnabled, jobConfig.getStandbyTasksEnabled(), localityManager, faultDomainManager, config);
 
     this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager);
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
+    this.restartContainers = restartContainers;
     LOG.info("Finished container process manager initialization.");
   }
 
@@ -185,7 +191,8 @@
       ClusterResourceManager resourceManager,
       Optional<ContainerAllocator> allocator,
       ContainerManager containerManager,
-      LocalityManager localityManager) {
+      LocalityManager localityManager,
+      boolean restartContainers) {
     this.state = state;
     this.clusterManagerConfig = clusterManagerConfig;
     this.jobConfig = new JobConfig(clusterManagerConfig);
@@ -200,6 +207,7 @@
       () -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state,
           hostAffinityEnabled, this.containerManager));
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
+    this.restartContainers = restartContainers;
     LOG.info("Finished container process manager initialization");
   }
 
@@ -236,22 +244,36 @@
       diagnosticsManager.get().start();
     }
 
-    LOG.info("Starting the cluster resource manager");
-    clusterResourceManager.start();
-
+    // In AM-HA, clusterResourceManager receives already running containers
+    // and invokes onStreamProcessorLaunchSuccess which inturn updates state
+    // hence state has to be set prior to starting clusterResourceManager.
     state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
     state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
 
+    LOG.info("Starting the cluster resource manager");
+    clusterResourceManager.start();
+
     // Request initial set of containers
     LocalityModel localityModel = localityManager.readLocality();
     Map<String, String> processorToHost = new HashMap<>();
-    state.jobModelManager.jobModel().getContainers().keySet().forEach((containerId) -> {
-      String host = Optional.ofNullable(localityModel.getProcessorLocality(containerId))
+    state.jobModelManager.jobModel().getContainers().keySet().forEach((processorId) -> {
+      String host = Optional.ofNullable(localityModel.getProcessorLocality(processorId))
           .map(ProcessorLocality::host)
           .filter(StringUtils::isNotBlank)
           .orElse(null);
-      processorToHost.put(containerId, host);
+      processorToHost.put(processorId, host);
     });
+    if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
+      // don't request resource for container that is already running
+      state.runningProcessors.forEach((processorId, samzaResource) -> {
+        LOG.info("Not requesting container for processorId: {} since its already running as containerId: {}",
+            processorId, samzaResource.getContainerId());
+        processorToHost.remove(processorId);
+        if (restartContainers) {
+          clusterResourceManager.stopStreamProcessor(samzaResource);
+        }
+      });
+    }
     containerAllocator.requestResources(processorToHost);
 
     // Start container allocator thread
@@ -334,7 +356,6 @@
       return;
     }
     state.runningProcessors.remove(processorId);
-
     int exitStatus = resourceStatus.getExitCode();
     switch (exitStatus) {
       case SamzaResourceStatus.SUCCESS:
@@ -413,7 +434,6 @@
           processorId, containerId, containerHost);
       state.pendingProcessors.remove(processorId);
       state.runningProcessors.put(processorId, resource);
-
       if (state.neededProcessors.decrementAndGet() == 0) {
         state.jobHealthy.set(true);
       }
@@ -632,6 +652,26 @@
   }
 
   /**
+   * Returns an instantiated {@link FaultDomainManagerFactory} from a {@link ClusterManagerConfig}. The
+   * {@link FaultDomainManagerFactory} is used to return an implementation of a {@link FaultDomainManager}
+   *
+   * @param clusterManagerConfig, the cluster manager config to parse.
+   *
+   */
+  private FaultDomainManagerFactory getFaultDomainManagerFactory(final ClusterManagerConfig clusterManagerConfig) {
+    final String faultDomainManagerFactoryClass = clusterManagerConfig.getFaultDomainManagerClass();
+    final FaultDomainManagerFactory factory;
+
+    try {
+      factory = ReflectionUtil.getObj(faultDomainManagerFactoryClass, FaultDomainManagerFactory.class);
+    } catch (Exception e) {
+      LOG.error("Error creating the fault domain manager.", e);
+      throw new SamzaException(e);
+    }
+    return factory;
+  }
+
+  /**
    * Obtains the ID of the Samza processor pending launch on the provided resource (container).
    *
    * ContainerProcessManager [INFO] Container ID: container_e66_1569376389369_0221_01_000049 matched pending Processor ID: 0 on host: ltx1-app0772.stg.linkedin.com
@@ -669,4 +709,4 @@
   private void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus, Duration preferredHostRetryDelay) {
     containerManager.handleContainerStop(processorId, containerId, preferredHost, exitStatus, preferredHostRetryDelay, containerAllocator);
   }
-}
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
new file mode 100644
index 0000000..a5ebdd4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * A fault domain is a set of hardware components that share a single point of failure.
+ * This class identifies the type (ex: rack) and ID (ex: rack ID) of the fault domain in question.
+ * A host can belong to multiple fault domains.
+ * A fault domain may have greater than or equal to 1 hosts.
+ * A cluster can comprise of hosts on multiple fault domains.
+ */
+public class FaultDomain {
+
+  private final FaultDomainType type;
+  private final String id;
+
+  public FaultDomain(FaultDomainType type, String id) {
+    Preconditions.checkNotNull(type, "Fault domain type (ex:rack) cannot be null.");
+    Preconditions.checkNotNull(id, "Fault domain ID (rack ID) cannot be null.");
+
+    this.type = type;
+    this.id = id;
+  }
+
+  /**
+   * Gets the type of fault domain, for example: rack.
+   * @return Type of fault domain
+   */
+  public FaultDomainType getType() {
+    return type;
+  }
+
+  /**
+   * Gets the id of the fault domain, for example: rack ID.
+   * @return fault domain ID
+   */
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public String toString() {
+    return " {" +
+            "type = " + type +
+            ", id = " + id +
+            "} ";
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    FaultDomain that = (FaultDomain) o;
+    return Objects.equal(type, that.type) && Objects.equal(id, that.id);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(type, id);
+  }
+
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
new file mode 100644
index 0000000..3f4e0a9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Set;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ *  This interface gets fault domain information of all hosts that are running in the cluster,
+ *  from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a container can be placed on (for ex: based on standby constraints).
+ *  The host to fault domain map used here will always be cached and only updated in case the AM dies or an active
+ *  container is assigned to a host which is not in the map.
+ *  This is not thread-safe.
+ */
+@InterfaceStability.Unstable
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster, for all hosts that are healthy, up and running.
+   * This set might not be up to date with the current state of the cluster, as its freshness is an implementation detail.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular host resides on based on the internal cache.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  Set<FaultDomain> getFaultDomainsForHost(String host);
+
+  /**
+   * This method returns true if the fault domains on which these two hosts reside are exactly the same, false otherwise.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same fault domain
+   */
+  boolean hasSameFaultDomains(String host1, String host2);
+
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManagerFactory.java b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManagerFactory.java
new file mode 100644
index 0000000..cef0148
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManagerFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * A factory to build a {@link FaultDomainManager}.
+ */
+@InterfaceStability.Unstable
+public interface FaultDomainManagerFactory {
+
+  FaultDomainManager getFaultDomainManager(Config config, MetricsRegistry metricsRegistry);
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainType.java b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainType.java
new file mode 100644
index 0000000..ca2fdd1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+/**
+ * This enum defines the type of fault domain used depending on the environment they are in.
+ */
+public enum FaultDomainType {
+    RACK
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index 784f0b4..f442094 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -106,6 +106,17 @@
   public final ConcurrentMap<String, SamzaResource> runningProcessors = new ConcurrentHashMap<>(0);
 
   /**
+   * Map of Samza processor Id (aka logical id) to execution environment container id (aka physical id ex: yarn container id).
+   * This map will be used during the start up phase of new AM in AM-HA.
+   *
+   * This map is populated at startup of ClusterBasedJobCoordinator.
+   * It initially holds the processorId to execution id mapping (if any) present in the coordinator stream.
+   * This could correspond to processors currently running or from previous attempt or previous deploy.
+   * TODO: SAMZA-2607 : remove this map and all its usages.
+   */
+  public final ConcurrentMap<String, String> processorToExecutionId = new ConcurrentHashMap<>(0);
+
+  /**
    *  Map of the failed Samza processor ID to resource status of the last attempted of the container.
    *  This map is only used when {@link org.apache.samza.config.ClusterManagerConfig#CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES}
    *  is set to false, this map tracks the containers which have exhausted all retires for restart and JobCoordinator is
@@ -170,6 +181,26 @@
    */
   public final AtomicInteger failedContainerPlacementActions = new AtomicInteger(0);
 
+  /**
+   * Number of fault domain aware container requests made for a job.
+   */
+  public final AtomicInteger faultDomainAwareContainerRequests = new AtomicInteger(0);
+
+  /**
+   * Number of fault domain aware containers started for a job.
+   */
+  public final AtomicInteger faultDomainAwareContainersStarted = new AtomicInteger(0);
+
+  /**
+   * Number of expired fault domain aware container requests made for a job.
+   */
+  public final AtomicInteger expiredFaultDomainAwareContainerRequests = new AtomicInteger(0);
+
+  /**
+   * Number of failed fault domain aware container allocations for a job.
+   */
+  public final AtomicInteger failedFaultDomainAwareContainerAllocations = new AtomicInteger(0);
+
   public SamzaApplicationState(JobModelManager jobModelManager) {
     this.jobModelManager = jobModelManager;
   }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
index 5ca8b24..be804c9 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -19,7 +19,11 @@
 
 package org.apache.samza.clustermanager;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import java.time.Instant;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +53,10 @@
    */
   private final String preferredHost;
   /**
+   * The set of fault domains on which the resource must be allocated.
+   */
+  private final Set<FaultDomain> faultDomains;
+  /**
    * A request is identified by an unique identifier.
    */
   private final String requestId;
@@ -63,7 +71,11 @@
   private final Instant requestTimestamp;
 
   public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId) {
-    this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now(), ImmutableSet.of());
+  }
+
+  public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId, Set<FaultDomain> faultDomains) {
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now(), faultDomains);
   }
 
   public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId, Instant requestTimestamp) {
@@ -73,7 +85,22 @@
     this.requestId = UUID.randomUUID().toString();
     this.processorId = processorId;
     this.requestTimestamp = requestTimestamp;
-    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at time: {} with Request ID: {}", this.processorId, this.preferredHost, this.requestTimestamp, this.requestId);
+    this.faultDomains = new HashSet<>();
+    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at time: {} with Request ID: {}, and the following list of fault domains: {}",
+            this.processorId, this.preferredHost, this.requestTimestamp, this.requestId, this.faultDomains);
+  }
+
+  public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId, Instant requestTimestamp, Set<FaultDomain> faultDomains) {
+    Preconditions.checkNotNull(faultDomains, "Set of fault domains should not be null.");
+    this.numCores = numCores;
+    this.memoryMB = memoryMB;
+    this.preferredHost = preferredHost;
+    this.requestId = UUID.randomUUID().toString();
+    this.processorId = processorId;
+    this.requestTimestamp = requestTimestamp;
+    this.faultDomains = faultDomains;
+    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at time: {} with Request ID: {} and the following list of fault domains: {}",
+            this.processorId, this.preferredHost, this.requestTimestamp, this.requestId, this.faultDomains.toString());
   }
 
   public String getProcessorId() {
@@ -96,6 +123,10 @@
     return preferredHost;
   }
 
+  public Set<FaultDomain> getFaultDomains() {
+    return faultDomains;
+  }
+
   public int getMemoryMB() {
     return memoryMB;
   }
@@ -109,6 +140,7 @@
             ", requestId='" + requestId + '\'' +
             ", processorId=" + processorId +
             ", requestTimestampMs=" + requestTimestamp +
+            ", faultDomains=" + faultDomains.toString() +
             '}';
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index b849ea5..a07a924 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -28,6 +29,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
@@ -56,8 +59,13 @@
   // Resource-manager, used to stop containers
   private ClusterResourceManager clusterResourceManager;
 
-  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
-      ClusterResourceManager clusterResourceManager, LocalityManager localityManager) {
+  // FaultDomainManager, used to get fault domain information of different hosts from the cluster manager.
+  private final FaultDomainManager faultDomainManager;
+
+  private final boolean isFaultDomainAwareStandbyEnabled;
+
+  public StandbyContainerManager(SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
+                                 LocalityManager localityManager, Config config, FaultDomainManager faultDomainManager) {
     this.failovers = new ConcurrentHashMap<>();
     this.localityManager = localityManager;
     this.standbyContainerConstraints = new HashMap<>();
@@ -70,6 +78,9 @@
         .forEach(containerId -> standbyContainerConstraints.put(containerId,
             StandbyTaskUtil.getStandbyContainerConstraints(containerId, jobModel)));
     this.clusterResourceManager = clusterResourceManager;
+    this.faultDomainManager = faultDomainManager;
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+    this.isFaultDomainAwareStandbyEnabled = clusterManagerConfig.getFaultDomainAwareStandbyEnabled();
 
     log.info("Populated standbyContainerConstraints map {}", standbyContainerConstraints);
   }
@@ -125,7 +136,9 @@
 
     if (StandbyTaskUtil.isStandbyContainer(containerID)) {
       log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", containerID);
-      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
     } else {
       initiateStandbyAwareAllocation(containerID, resourceID, containerAllocator);
     }
@@ -157,6 +170,22 @@
   }
 
   /**
+   * This method removes the fault domain of the host passed as an argument, from the set of fault domains, and then returns it.
+   * The set of fault domains returned is based on the set difference between all the available fault domains in the
+   * cluster and the fault domain associated with the host that is passed as input.
+   * @param hostToAvoid hostname whose fault domains are excluded
+   * @return The set of fault domains which excludes the fault domain that the given host is on
+   */
+  public Set<FaultDomain> getAllowedFaultDomainsGivenHostToAvoid(String hostToAvoid) {
+    Set<FaultDomain> allFaultDomains = faultDomainManager.getAllFaultDomains();
+    Set<FaultDomain> faultDomainToAvoid = Optional.ofNullable(hostToAvoid)
+            .map(faultDomainManager::getFaultDomainsForHost)
+            .orElse(Collections.emptySet());
+    allFaultDomains.removeAll(faultDomainToAvoid);
+    return allFaultDomains;
+  }
+
+  /**
    *  If a standby container has stopped, then there are two possible cases
    *    Case 1. during a failover, the standby container was stopped for an active's start, then we
    *       1. request a resource on the standby's host to place the activeContainer, and
@@ -181,24 +210,29 @@
 
       // request standbycontainer's host for active-container
       SamzaResourceRequest resourceRequestForActive =
-          containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
+        containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
       // record the resource request, before issuing it to avoid race with allocation-thread
       failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
       containerAllocator.issueResourceRequest(resourceRequestForActive);
 
       // request any-host for standby container
-      containerAllocator.requestResource(standbyContainerID, ResourceRequestState.ANY_HOST);
+      requestResource(containerAllocator, standbyContainerID, ResourceRequestState.ANY_HOST, Duration.ZERO, standbyContainerHostname);
     } else {
       log.info("Issuing request for standby container {} on host {}, since this is not for a failover",
           standbyContainerID, preferredHost);
-      containerAllocator.requestResourceWithDelay(standbyContainerID, preferredHost, preferredHostRetryDelay);
+      String activeContainerHost = getActiveContainerHost(standbyContainerID)
+              .orElse(null);
+      requestResource(containerAllocator, standbyContainerID, preferredHost, preferredHostRetryDelay, activeContainerHost);
     }
   }
 
   /** Method to handle standby-aware allocation for an active container.
    *  We try to find a standby host for the active container, and issue a stop on any standby-containers running on it,
    *  request resource to place the active on the standby's host, and one to place the standby elsewhere.
-   *
+   *  When requesting for resources,
+   *  NOTE: When rack awareness is turned on, we always pass the <code>hostToAvoid</> parameter as null for the {@link #requestResource} method used here
+   *  because the hostname of the previous active processor that died does not exist in the running or pending container list anymore.
+   *  However, different racks will always be guaranteed through {@link #checkStandbyConstraintsAndRunStreamProcessor}.
    * @param activeContainerID the samzaContainerID of the active-container
    * @param resourceID  the samza-resource-ID of the container when it failed (used to index failover-state)
    */
@@ -235,7 +269,7 @@
 
         // record the resource request, before issuing it to avoid race with allocation-thread
         SamzaResourceRequest resourceRequestForActive =
-            containerAllocator.getResourceRequest(activeContainerID, standbyHost);
+                containerAllocator.getResourceRequest(activeContainerID, standbyHost);
         failoverMetadata.recordResourceRequest(resourceRequestForActive);
         containerAllocator.issueResourceRequest(resourceRequestForActive);
         samzaApplicationState.failoversToStandby.incrementAndGet();
@@ -281,7 +315,7 @@
     Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(activeContainerResourceID);
 
     // Iterate over the list of running standby containers, to find a standby resource that we have not already
-    // used for a failover for this active resoruce
+    // used for a failover for this active resource
     for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) {
 
       if (samzaApplicationState.runningProcessors.containsKey(standbyContainerID)) {
@@ -307,7 +341,7 @@
               .map(ProcessorLocality::host)
               .orElse(null);
 
-      if (StringUtils.isNotBlank(standbyHost)) {
+      if (StringUtils.isBlank(standbyHost)) {
         log.info("No last known standbyHost for container {}", standbyContainerID);
       } else if (failoverMetadata.isPresent() && failoverMetadata.get().isStandbyHostUsed(standbyHost)) {
 
@@ -361,8 +395,44 @@
   }
 
   /**
-   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+   * This method checks from the config if standby allocation is fault domain aware or not, and requests resources accordingly.
    *
+   * @param containerAllocator ContainerAllocator object that requests for resources from the resource manager
+   * @param containerID Samza container ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param preferredHostRetryDelay the {@link Duration} to add to the request timestamp
+   * @param hostToAvoid The hostname to avoid requesting this resource on if fault domain aware standby allocation is enabled
+   */
+  void requestResource(ContainerAllocator containerAllocator, String containerID, String preferredHost, Duration preferredHostRetryDelay, String hostToAvoid) {
+    if (StandbyTaskUtil.isStandbyContainer(containerID) && isFaultDomainAwareStandbyEnabled) {
+      containerAllocator.requestResourceWithDelay(containerID, preferredHost, preferredHostRetryDelay, getAllowedFaultDomainsGivenHostToAvoid(hostToAvoid));
+    } else {
+      containerAllocator.requestResourceWithDelay(containerID, preferredHost, preferredHostRetryDelay, new HashSet<>());
+    }
+  }
+
+  /**
+   * This method returns the active container host given a standby or active container ID.
+   *
+   * @param containerID Standby or active container container ID
+   * @return The active container host
+   */
+  Optional<String> getActiveContainerHost(String containerID) {
+    String activeContainerId = containerID;
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      activeContainerId = StandbyTaskUtil.getActiveContainerId(containerID);
+    }
+    SamzaResource resource = samzaApplicationState.pendingProcessors.get(activeContainerId);
+    if (resource == null) {
+      resource = samzaApplicationState.runningProcessors.get(activeContainerId);
+    }
+    return Optional.ofNullable(resource)
+            .map(SamzaResource::getHost);
+  }
+
+  /**
+   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+   * This includes the check that a standby and its active should not be on the same fault domain or the same host.
    * @param containerIdToStart logical id of the container to start
    * @param host potential host to start the container on
    * @return
@@ -375,17 +445,33 @@
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
+      if (resource != null && isFaultDomainAwareStandbyEnabled
+              && faultDomainManager.hasSameFaultDomains(host, resource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this fault domain",
+                containerIdToStart, host, containerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (resource != null && resource.getHost().equals(host)) {
         log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+                containerIdToStart, host, containerID);
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
+      if (resource != null && isFaultDomainAwareStandbyEnabled
+              && faultDomainManager.hasSameFaultDomains(host, resource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already running on this fault domain",
+                containerIdToStart, host, containerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (resource != null && resource.getHost().equals(host)) {
         log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+                containerIdToStart, host, containerID);
         return false;
       }
     }
@@ -409,16 +495,21 @@
       log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID,
           samzaResource.getHost(), preferredHost);
       containerAllocator.runStreamProcessor(request, preferredHost);
+      if (isFaultDomainAwareStandbyEnabled && StandbyTaskUtil.isStandbyContainer(containerID)) {
+        samzaApplicationState.faultDomainAwareContainersStarted.incrementAndGet();
+      }
     } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
       // This resource cannot be used to launch this standby container, so we make a new anyhost request
       log.info(
           "Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request",
           containerID, samzaResource.getHost());
       releaseUnstartableContainer(request, samzaResource, preferredHost, resourceRequestState);
-      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
       samzaApplicationState.failedStandbyAllocations.incrementAndGet();
     } else {
-      // This resource cannot be used to launch this active container container, so we initiate a failover
+      // This resource cannot be used to launch this active container, so we initiate a failover
       log.warn(
           "Running active container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource",
           containerID, samzaResource.getHost());
@@ -469,7 +560,6 @@
     resourceRequestState.cancelResourceRequest(request);
   }
 
-
   // Handle an expired resource request that was made for placing a standby container
   private void handleExpiredRequestForStandbyContainer(String containerID, SamzaResourceRequest request,
       Optional<SamzaResource> alternativeResource, ContainerAllocator containerAllocator,
@@ -486,7 +576,9 @@
       // If there is no alternative-resource for the standby container we make a new anyhost request
       log.info("Handling expired request, requesting anyHost resource for standby container {}", containerID);
       resourceRequestState.cancelResourceRequest(request);
-      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
     }
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index 231bdda..3f27991 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -38,6 +38,15 @@
   private static final String CLUSTER_MANAGER_FACTORY = "samza.cluster-manager.factory";
   private static final String CLUSTER_MANAGER_FACTORY_DEFAULT = "org.apache.samza.job.yarn.YarnResourceManagerFactory";
 
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY = "cluster-manager.fault-domain-manager.factory";
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY_DEFAULT = "org.apache.samza.job.yarn.YarnFaultDomainManagerFactory";
+
+  /**
+   * Determines whether standby allocation is fault domain aware or not.
+   */
+  public static final String FAULT_DOMAIN_AWARE_STANDBY_ENABLED = "cluster-manager.fault-domain-aware.standby.enabled";
+  public static final boolean FAULT_DOMAIN_AWARE_STANDBY_ENABLED_DEFAULT = false;
+
   /**
    * Sleep interval for the allocator thread in milliseconds
    */
@@ -250,6 +259,14 @@
     return get(CLUSTER_MANAGER_FACTORY, CLUSTER_MANAGER_FACTORY_DEFAULT);
   }
 
+  public String getFaultDomainManagerClass() {
+    return get(FAULT_DOMAIN_MANAGER_FACTORY, FAULT_DOMAIN_MANAGER_FACTORY_DEFAULT);
+  }
+
+  public boolean getFaultDomainAwareStandbyEnabled() {
+    return getBoolean(FAULT_DOMAIN_AWARE_STANDBY_ENABLED, FAULT_DOMAIN_AWARE_STANDBY_ENABLED_DEFAULT);
+  }
+
   public boolean getJmxEnabledOnJobCoordinator() {
     if (containsKey(CLUSTER_MANAGER_JMX_ENABLED)) {
       log.warn("Configuration {} is deprecated. Please use {}", CLUSTER_MANAGER_JMX_ENABLED, JOB_JMX_ENABLED);
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 0b274e9..021c67d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -147,6 +147,21 @@
 
   private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
 
+  // Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability (AM-HA).
+  // High availability allows new AM to establish connection with already running containers
+  public static final String YARN_AM_HIGH_AVAILABILITY_ENABLED = "yarn.am.high-availability.enabled";
+  public static final boolean YARN_AM_HIGH_AVAILABILITY_ENABLED_DEFAULT = false;
+
+  // If AM-HA is enabled, when a running container loses heartbeat with AM,
+  // this count gives the number of times an already running container will attempt to establish heartbeat with new AM.
+  public static final String YARN_CONTAINER_HEARTBEAT_RETRY_COUNT = "yarn.container.heartbeat.retry.count";
+  public static final long YARN_CONTAINER_HEARTBEAT_RETRY_COUNT_DEFAULT = 5;
+
+  // If AM-HA is enabled, when a running container loses heartbeat with AM,
+  // this duration gives the amount of time a running container will sleep between attempts to establish heartbeat with new AM.
+  public static final String YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS = "yarn.container.heartbeat.retry-sleep-duration.ms";
+  public static final long YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS_DEFAULT = 10000;
+
   public JobConfig(Config config) {
     super(config);
   }
@@ -398,6 +413,19 @@
     return get(COORDINATOR_STREAM_FACTORY, DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY);
   }
 
+  public boolean getApplicationMasterHighAvailabilityEnabled() {
+    return getBoolean(YARN_AM_HIGH_AVAILABILITY_ENABLED, YARN_AM_HIGH_AVAILABILITY_ENABLED_DEFAULT);
+  }
+
+  public long getContainerHeartbeatRetryCount() {
+    return getLong(YARN_CONTAINER_HEARTBEAT_RETRY_COUNT, YARN_CONTAINER_HEARTBEAT_RETRY_COUNT_DEFAULT);
+  }
+
+  public long getContainerHeartbeatRetrySleepDurationMs() {
+    return getLong(YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS,
+        YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS_DEFAULT);
+  }
+
   /**
    * Get config loader factory according to the configs
    * @return full qualified name of {@link ConfigLoaderFactory}
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 89b5fc9..0eba766 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -26,6 +26,11 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.util.ThreadUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,26 +39,47 @@
 public class ContainerHeartbeatMonitor {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
   private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
+  private static final CoordinatorStreamValueSerde SERDE = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+
   @VisibleForTesting
   static final int SCHEDULE_MS = 60000;
   @VisibleForTesting
   static final int SHUTDOWN_TIMOUT_MS = 120000;
 
   private final Runnable onContainerExpired;
-  private final ContainerHeartbeatClient containerHeartbeatClient;
   private final ScheduledExecutorService scheduler;
+  private final String containerExecutionId;
+  private final MetadataStore coordinatorStreamStore;
+  private final long sleepDurationForReconnectWithAM;
+  private final boolean isApplicationMasterHighAvailabilityEnabled;
+  private final long retryCount;
+
+  private ContainerHeartbeatClient containerHeartbeatClient;
+  private String coordinatorUrl;
   private boolean started = false;
 
-  public ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient) {
-    this(onContainerExpired, containerHeartbeatClient, Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY));
+  public ContainerHeartbeatMonitor(Runnable onContainerExpired, String coordinatorUrl, String containerExecutionId,
+      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled, long retryCount,
+      long sleepDurationForReconnectWithAM) {
+    this(onContainerExpired, new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId),
+        Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), coordinatorUrl, containerExecutionId,
+        coordinatorStreamStore, isApplicationMasterHighAvailabilityEnabled, retryCount, sleepDurationForReconnectWithAM);
   }
 
   @VisibleForTesting
   ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient,
-      ScheduledExecutorService scheduler) {
+      ScheduledExecutorService scheduler, String coordinatorUrl, String containerExecutionId,
+      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled,
+      long retryCount, long sleepDurationForReconnectWithAM) {
     this.onContainerExpired = onContainerExpired;
     this.containerHeartbeatClient = containerHeartbeatClient;
     this.scheduler = scheduler;
+    this.coordinatorUrl = coordinatorUrl;
+    this.containerExecutionId = containerExecutionId;
+    this.coordinatorStreamStore = coordinatorStreamStore;
+    this.isApplicationMasterHighAvailabilityEnabled = isApplicationMasterHighAvailabilityEnabled;
+    this.retryCount = retryCount;
+    this.sleepDurationForReconnectWithAM = sleepDurationForReconnectWithAM;
   }
 
   public void start() {
@@ -65,13 +91,21 @@
     scheduler.scheduleAtFixedRate(() -> {
       ContainerHeartbeatResponse response = containerHeartbeatClient.requestHeartbeat();
       if (!response.isAlive()) {
-        scheduler.schedule(() -> {
-          // On timeout of container shutting down, force exit.
-          LOG.error("Graceful shutdown timeout expired. Force exiting.");
-          ThreadUtil.logThreadDump("Thread dump at heartbeat monitor shutdown timeout.");
-          System.exit(1);
-        }, SHUTDOWN_TIMOUT_MS, TimeUnit.MILLISECONDS);
-        onContainerExpired.run();
+        if (isApplicationMasterHighAvailabilityEnabled) {
+          LOG.warn("Failed to establish connection with {}. Checking for new AM", coordinatorUrl);
+          try {
+            if (checkAndEstablishConnectionWithNewAM()) {
+              return;
+            }
+          } catch (Exception e) {
+            // On exception in re-establish connection with new AM, force exit.
+            LOG.error("Exception trying to connect with new AM", e);
+            forceExit("failure in establishing cconnection with new AM", 0);
+            return;
+          }
+        }
+        // On timeout of container shutting down, force exit.
+        forceExit("Graceful shutdown timeout expired. Force exiting.", SHUTDOWN_TIMOUT_MS);
       }
     }, 0, SCHEDULE_MS, TimeUnit.MILLISECONDS);
     started = true;
@@ -84,6 +118,47 @@
     }
   }
 
+  private boolean checkAndEstablishConnectionWithNewAM() {
+    boolean response = false;
+    int attempt = 1;
+
+    while (attempt <= retryCount) {
+      String newCoordinatorUrl = SERDE.fromBytes(coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL));
+      try {
+        if (coordinatorUrl.equals(newCoordinatorUrl)) {
+          LOG.info("Attempt {} to discover new AM. Sleep for {}ms before next attempt.", attempt, sleepDurationForReconnectWithAM);
+          Thread.sleep(sleepDurationForReconnectWithAM);
+        } else {
+          LOG.info("Found new AM: {}. Establishing heartbeat with the new AM.", newCoordinatorUrl);
+          coordinatorUrl = newCoordinatorUrl;
+          containerHeartbeatClient = createContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
+          response = containerHeartbeatClient.requestHeartbeat().isAlive();
+          LOG.info("Received heartbeat response: {} from new AM: {}", response, this.coordinatorUrl);
+          break;
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted during sleep.");
+        throw new SamzaException(e);
+      }
+      attempt++;
+    }
+    return response;
+  }
+
+  @VisibleForTesting
+  ContainerHeartbeatClient createContainerHeartbeatClient(String coordinatorUrl, String containerExecutionId) {
+    return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
+  }
+
+  private void forceExit(String message, int timeout) {
+    scheduler.schedule(() -> {
+      LOG.error(message);
+      ThreadUtil.logThreadDump("Thread dump at heartbeat monitor: due to " + message);
+      System.exit(1);
+    }, timeout, TimeUnit.MILLISECONDS);
+    onContainerExpired.run();
+  }
+
   private static class HeartbeatThreadFactory implements ThreadFactory {
     private static final String PREFIX = "Samza-" + ContainerHeartbeatMonitor.class.getSimpleName() + "-";
     private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
diff --git a/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java b/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
new file mode 100644
index 0000000..557ef77
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.serializers.Serde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used for persisting and reading the execution environment container id information into the metadata store.
+ * Processor id (logical Samza processor id) to execution environment container id (ex: yarn container id) is written.
+ **/
+public class ExecutionContainerIdManager {
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionContainerIdManager.class);
+
+  private final Serde<String> valueSerde;
+  private final MetadataStore metadataStore;
+
+  /**
+   * Builds the ExecutionContainerIdManager based upon the provided {@link MetadataStore} that is instantiated.
+   * Uses the {@link CoordinatorStreamValueSerde} to serialize messages before reading/writing into metadata store.
+   * @param metadataStore an instance of {@link MetadataStore} to read/write the processor container id mapping.
+   */
+  public ExecutionContainerIdManager(MetadataStore metadataStore) {
+    this.metadataStore = metadataStore;
+    this.valueSerde = new CoordinatorStreamValueSerde(SetExecutionEnvContainerIdMapping.TYPE);
+  }
+
+  public void writeExecutionEnvironmentContainerIdMapping(String processorId, String executionEnvContainerId) {
+    Preconditions.checkNotNull(processorId, "Container's logical processor id can not be null.");
+    Preconditions.checkNotNull(executionEnvContainerId, "Container's physical execution environment container id can not be null.");
+    LOG.info("Processor {} has executionEnvContainerId as {}", processorId, executionEnvContainerId);
+    metadataStore.put(processorId, valueSerde.toBytes(executionEnvContainerId));
+    metadataStore.flush();
+  }
+
+  public Map<String, String> readExecutionEnvironmentContainerIdMapping() {
+    Map<String, String> executionEnvironmentContainerIdMapping = new HashMap<>();
+    metadataStore.all().forEach((processorId, valueBytes) -> {
+      if (valueBytes != null) {
+        String executionEnvContainerId = valueSerde.fromBytes(valueBytes);
+        executionEnvironmentContainerIdMapping.put(processorId, executionEnvContainerId);
+      }
+    });
+    if (LOG.isDebugEnabled()) {
+      for (Map.Entry<String, String> entry : executionEnvironmentContainerIdMapping.entrySet()) {
+        LOG.debug("Processor {} has executionEnvContainerId as {}", entry.getKey(), entry.getValue());
+      }
+    }
+    return executionEnvironmentContainerIdMapping;
+  }
+
+  public void close() {
+    metadataStore.close();
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
new file mode 100644
index 0000000..c4540a6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Hashing;
+import com.google.common.hash.PrimitiveSink;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class to manage read and writes of {@link JobCoordinatorMetadata} to {@link MetadataStore}. It also provides
+ * additional helper functionalities to generate {@link JobCoordinatorMetadata} and check for changes across runs.
+ */
+public class JobCoordinatorMetadataManager {
+  private static final Logger LOG = LoggerFactory.getLogger(JobCoordinatorMetadataManager.class);
+  private static final String APPLICATION_ATTEMPT_COUNT = "applicationAttemptCount";
+  private static final String JOB_COORDINATOR_MANAGER_METRICS = "job-coordinator-manager";
+  private static final String JOB_MODEL_CHANGED = "jobModelChanged";
+  private static final String CONFIG_CHANGED = "configChanged";
+  private static final String METADATA_GENERATION_FAILED_COUNT = "metadataGenerationFailedCount";
+  private static final String METADATA_READ_FAILED_COUNT = "metadataReadFailedCount";
+  private static final String METADATA_WRITE_FAILED_COUNT = "metadataWriteFailedCount";
+  private static final String NEW_DEPLOYMENT = "newDeployment";
+
+  static final String CONTAINER_ID_PROPERTY = "CONTAINER_ID";
+  static final String CONTAINER_ID_DELIMITER = "_";
+
+  private final Counter applicationAttemptCount;
+  private final Counter metadataGenerationFailedCount;
+  private final Counter metadataReadFailedCount;
+  private final Counter metadataWriteFailedCount;
+  private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> configChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> newDeployment;
+  private final MetadataStore metadataStore;
+  private final ObjectMapper metadataMapper = SamzaObjectMapper.getObjectMapper();
+  private final Serde<String> valueSerde;
+  private final ClusterType clusterType;
+
+  public JobCoordinatorMetadataManager(MetadataStore metadataStore, ClusterType clusterType,
+      MetricsRegistry metricsRegistry) {
+    this(metadataStore, clusterType, metricsRegistry,
+        new CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE));
+  }
+
+  @VisibleForTesting
+  JobCoordinatorMetadataManager(MetadataStore metadataStore, ClusterType clusterType, MetricsRegistry metricsRegistry,
+      Serde<String> valueSerde) {
+    Preconditions.checkNotNull(clusterType, "Cluster type cannot be null");
+
+    this.clusterType = clusterType;
+    this.metadataStore = metadataStore;
+    this.valueSerde = valueSerde;
+
+    applicationAttemptCount = metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, APPLICATION_ATTEMPT_COUNT);
+    configChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, CONFIG_CHANGED, 0);
+    jobModelChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, JOB_MODEL_CHANGED, 0);
+    metadataGenerationFailedCount = metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS,
+        METADATA_GENERATION_FAILED_COUNT);
+    metadataReadFailedCount = metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, METADATA_READ_FAILED_COUNT);
+    metadataWriteFailedCount = metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, METADATA_WRITE_FAILED_COUNT);
+    newDeployment = metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, NEW_DEPLOYMENT, 0);
+  }
+
+  /**
+   * Generates {@link JobCoordinatorMetadata} for the {@link JobCoordinator}.
+   *
+   * Epoch ID - It is generated by {@link #fetchEpochIdForJobCoordinator()}. Refer to the javadocs for more
+   * details on how it is generated and the properties of the identifier.
+   *
+   * Config ID - A unique and reproducible identifier that is generated based on the input {@link Config}. It uses
+   * a {@link Funnel} to use a subset of the input configuration to generate the identifier and as long as the subset
+   * of the configuration remains same, the identifier is guaranteed to be same. For the list of config prefixes used
+   * by the funnel refer to {@link ConfigHashFunnel}
+   *
+   * JobModel ID - A unique and reproducible identifier that is generated based on the input {@link JobModel}. It only
+   * uses the {@link org.apache.samza.job.model.ContainerModel} within the {@linkplain JobModel} for generation. We
+   * serialize the data into bytes and use those bytes to compute the identifier.
+   *
+   * In case of YARN, the epoch identifier is extracted from the application attempt and translates to applicationId
+   * e.g. 1606797336059_0010
+   * Both config and job model identifiers should a 32 bit integer.
+   *
+   * @param jobModel job model used for generating the metadata
+   * @param config config used for generating the metadata
+   *
+   * @return the metadata for the job coordinator
+   */
+  public JobCoordinatorMetadata generateJobCoordinatorMetadata(JobModel jobModel, Config config) {
+    try {
+      int jobModelId = Hashing
+          .crc32c()
+          .hashBytes(SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModel.getContainers()))
+          .asInt();
+      int configId = Hashing
+          .crc32()
+          .hashObject(config, new ConfigHashFunnel())
+          .asInt();
+
+      LOG.info("Generated job model id {} and config id {}", jobModelId, configId);
+      return new JobCoordinatorMetadata(fetchEpochIdForJobCoordinator(), String.valueOf(configId),
+          String.valueOf(jobModelId));
+    } catch (Exception e) {
+      metadataGenerationFailedCount.inc();
+      LOG.error("Failed to generate metadata for the current attempt due to ", e);
+      throw new SamzaException("Failed to generate the metadata for the current attempt due to ", e);
+    }
+  }
+
+  /**
+   * Check for changes between the metadata passed as inputs. Metadata is considered changed if any of the attributes within
+   * {@linkplain JobCoordinatorMetadata} changes.
+   *
+   * We intentionally check for each changes to help us track at this granularity. We want to use this information
+   * to determine if complex handling is required to cater these changes instead of blindly restarting all the
+   * containers upstream.
+   *
+   * @param newMetadata new metadata to be compared
+   * @param previousMetadata previous metadata to be compared against
+   *
+   * @return true if metadata changed, false otherwise
+   */
+  public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata, JobCoordinatorMetadata previousMetadata) {
+    boolean changed = true;
+
+    if (previousMetadata == null) {
+      newDeployment.set(1);
+    } else if (!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+      newDeployment.set(1);
+    } else if (!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+      jobModelChangedAcrossApplicationAttempt.set(1);
+    } else if (!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+      configChangedAcrossApplicationAttempt.set(1);
+    } else {
+      changed = false;
+      applicationAttemptCount.inc();
+    }
+
+    if (changed) {
+      LOG.info("Job coordinator metadata changed from: {} to: {}", previousMetadata, newMetadata);
+    } else {
+      LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+    }
+
+    return changed;
+  }
+
+  /**
+   * Reads the {@link JobCoordinatorMetadata} from the metadata store. It fetches the metadata
+   * associated with cluster type specified at the creation of the manager.
+   *
+   * @return job coordinator metadata
+   */
+  public JobCoordinatorMetadata readJobCoordinatorMetadata() {
+    JobCoordinatorMetadata metadata = null;
+    for (Map.Entry<String, byte[]> entry : metadataStore.all().entrySet()) {
+      if (clusterType.name().equals(entry.getKey())) {
+        try {
+          String metadataString = valueSerde.fromBytes(entry.getValue());
+          metadata = metadataMapper.readValue(metadataString, JobCoordinatorMetadata.class);
+          break;
+        } catch (Exception e) {
+          metadataReadFailedCount.inc();
+          LOG.error("Failed to read job coordinator metadata due to ", e);
+        }
+      }
+    }
+
+    LOG.info("Fetched the job coordinator metadata for cluster {} as {}.", clusterType, metadata);
+    return metadata;
+  }
+
+  /**
+   * Persist the {@link JobCoordinatorMetadata} in metadata store. The job coordinator metadata is associated
+   * with the cluster type specified at the creation of the manager.
+   *
+   * @param metadata metadata to be persisted
+   *
+   * @throws SamzaException in case of exception encountered during the writes to underlying metadata store
+   */
+  public void writeJobCoordinatorMetadata(JobCoordinatorMetadata metadata) {
+    Preconditions.checkNotNull(metadata, "Job coordinator metadata cannot be null");
+
+    try {
+      String metadataValueString = metadataMapper.writeValueAsString(metadata);
+      metadataStore.put(clusterType.name(), valueSerde.toBytes(metadataValueString));
+      LOG.info("Successfully written job coordinator metadata: {} for cluster {}.", metadata, clusterType);
+    } catch (Exception e) {
+      metadataWriteFailedCount.inc();
+      LOG.error("Failed to write the job coordinator metadata to metadata store due to ", e);
+      throw new SamzaException("Failed to write the job coordinator metadata.", e);
+    }
+  }
+
+  /**
+   * Generate the epoch id using the execution container id that is passed through system environment. This isn't ideal
+   * way of generating this ID and we will need some contract between the underlying cluster manager and samza engine
+   * around what the epoch ID should be like and what is needed to generate is across different cluster offerings.
+   * Due to unknowns defined above, we leave it as is and keep it simple for now. It is favorable to keep it this way
+   * instead of introducing a loosely defined interface/API and marking it unstable.
+   *
+   * The properties of the epoch identifier are as follows
+   *  1. Unique across applications in the cluster
+   *  2. Remains unchanged within a single deployment lifecycle
+   *  3. Remains unchanged across application attempt within a single deployment lifecycle
+   *  4. Changes across deployment lifecycle
+   *
+   *  Note: The above properties is something we want keep intact when extracting this into a well defined interface
+   *  or contract for YARN AM HA to work.
+   *  The format and property used to generate ID is specific to YARN and the specific format of the container name
+   *  is a public contract by YARN which is likely to remain backward compatible.
+   *
+   * @return an identifier associated with the job coordinator satisfying the above properties
+   */
+  @VisibleForTesting
+  String fetchEpochIdForJobCoordinator() {
+    String[] containerIdParts = getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
+    return containerIdParts[1] + CONTAINER_ID_DELIMITER + containerIdParts[2];
+  }
+
+  @VisibleForTesting
+  Counter getApplicationAttemptCount() {
+    return applicationAttemptCount;
+  }
+
+  @VisibleForTesting
+  Counter getMetadataGenerationFailedCount() {
+    return metadataGenerationFailedCount;
+  }
+
+  @VisibleForTesting
+  Counter getMetadataReadFailedCount() {
+    return metadataReadFailedCount;
+  }
+
+  @VisibleForTesting
+  Counter getMetadataWriteFailedCount() {
+    return metadataWriteFailedCount;
+  }
+
+  @VisibleForTesting
+  Gauge<Integer> getJobModelChangedAcrossApplicationAttempt() {
+    return jobModelChangedAcrossApplicationAttempt;
+  }
+
+  @VisibleForTesting
+  Gauge<Integer> getConfigChangedAcrossApplicationAttempt() {
+    return configChangedAcrossApplicationAttempt;
+  }
+
+  @VisibleForTesting
+  Gauge<Integer> getNewDeployment() {
+    return newDeployment;
+  }
+
+  @VisibleForTesting
+  String getEnvProperty(String propertyName) {
+    return System.getenv(propertyName);
+  }
+
+  /**
+   * A helper class to generate hash for the {@link Config} based on with a subset of configuration.
+   * The subset of configuration used are configurations that prefix match the allowed prefixes.
+   */
+  private static class ConfigHashFunnel implements Funnel<Config> {
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigHashFunnel.class);
+    // using sorted set to ensure the hash computation on configurations is reproducible and deterministic
+    private static final SortedSet<String> ALLOWED_PREFIXES = ImmutableSortedSet.of("job.autosizing");
+    @Override
+    public void funnel(Config from, PrimitiveSink into) {
+      SortedMap<String, String> map = new TreeMap<>();
+
+      ALLOWED_PREFIXES.forEach(prefix -> map.putAll(from.subset(prefix, false)));
+      LOG.info("Using the config {} to generate hash", map);
+      map.forEach((key, value) -> {
+        into.putUnencodedChars(key);
+        into.putUnencodedChars(value);
+      });
+    }
+  }
+
+  /**
+   * Type of the cluster deployment associated with the {@link JobCoordinatorMetadataManager}
+   */
+  public enum ClusterType {
+    YARN
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
index 9b862bd..c2df96c 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
@@ -23,6 +23,8 @@
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
@@ -51,7 +53,10 @@
   public String fromBytes(byte[] bytes) {
     Map<String, Object> values = messageSerde.fromBytes(bytes);
     CoordinatorStreamMessage message = new CoordinatorStreamMessage(new Object[]{}, values);
-    if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
+    if (type.equalsIgnoreCase(SetExecutionEnvContainerIdMapping.TYPE)) {
+      SetExecutionEnvContainerIdMapping executionContainerIdMapping = new SetExecutionEnvContainerIdMapping(message);
+      return executionContainerIdMapping.getExecutionEnvironmentContainerId();
+    } else if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping hostMapping = new SetContainerHostMapping(message);
       return hostMapping.getHostLocality();
     } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
@@ -69,6 +74,9 @@
     } else if (type.equalsIgnoreCase(SetTaskPartitionMapping.TYPE)) {
       SetTaskPartitionMapping setTaskPartitionMapping = new SetTaskPartitionMapping(message);
       return setTaskPartitionMapping.getTaskNames();
+    } else if (type.equalsIgnoreCase(SetJobCoordinatorMetadataMessage.TYPE)) {
+      SetJobCoordinatorMetadataMessage jobCoordinatorMetadataMessage = new SetJobCoordinatorMetadataMessage(message);
+      return jobCoordinatorMetadataMessage.getJobCoordinatorMetadata();
     } else {
       throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
     }
@@ -76,7 +84,11 @@
 
   @Override
   public byte[] toBytes(String value) {
-    if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
+    if (type.equalsIgnoreCase(SetExecutionEnvContainerIdMapping.TYPE)) {
+      SetExecutionEnvContainerIdMapping
+          executionEnvContainerIdMapping = new SetExecutionEnvContainerIdMapping(SOURCE, "", value);
+      return messageSerde.toBytes(executionEnvContainerIdMapping.getMessageMap());
+    } else if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping hostMapping = new SetContainerHostMapping(SOURCE, "", value, "", "");
       return messageSerde.toBytes(hostMapping.getMessageMap());
     } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
@@ -94,6 +106,9 @@
     } else if (type.equalsIgnoreCase(SetTaskPartitionMapping.TYPE)) {
       SetTaskPartitionMapping setTaskPartitionMapping = new SetTaskPartitionMapping(SOURCE, "", value);
       return messageSerde.toBytes(setTaskPartitionMapping.getMessageMap());
+    } else if (type.equalsIgnoreCase(SetJobCoordinatorMetadataMessage.TYPE)) {
+      SetJobCoordinatorMetadataMessage jobCoordinatorMetadataMessage = new SetJobCoordinatorMetadataMessage(SOURCE, "", value);
+      return messageSerde.toBytes(jobCoordinatorMetadataMessage.getMessageMap());
     } else {
       throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
     }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java
new file mode 100644
index 0000000..508b1df
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetExecutionEnvContainerIdMapping is used internally by the Samza framework to
+ * persist the processorId-to-executionEnvContainerId mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: $ProcessorId
+ *     Type: set-execution-env-container-id-mapping
+ *     Source: "SamzaContainer-$ProcessorId"
+ *     MessageMap:
+ *     {
+ *         execution-env-container-id: execution environment container id
+ *     }
+ * }
+ * */
+public class SetExecutionEnvContainerIdMapping extends CoordinatorStreamMessage {
+  public static final String TYPE = "set-execution-env-container-id-mapping";
+  public static final String EXEC_ENV_ID_KEY = "execution-env-container-id";
+
+  /**
+   * SetExecutionEnvContainerIdMapping is used to set the processor id to execution environment id mapping information.
+   * @param message which holds the processor id to execution environment id mapping information.
+   */
+  public SetExecutionEnvContainerIdMapping(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  /**
+   * SetExecutionEnvContainerIdMapping is used to set the processor id to execution environment id mapping information.
+   * @param source the source of the message
+   * @param key the key which is used to persist the message
+   * @param executionEnvContainerId the execution environment container id
+   */
+  public SetExecutionEnvContainerIdMapping(String source, String key, String executionEnvContainerId) {
+    super(source);
+    setType(TYPE);
+    setKey(key);
+    putMessageValue(EXEC_ENV_ID_KEY, executionEnvContainerId);
+  }
+
+  public String getExecutionEnvironmentContainerId() {
+    return getMessageValue(EXEC_ENV_ID_KEY);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetJobCoordinatorMetadataMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetJobCoordinatorMetadataMessage.java
new file mode 100644
index 0000000..55c37f4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetJobCoordinatorMetadataMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetJobCoordinatorMetadataMessage is used internally by the Samza framework to
+ * persist {@link org.apache.samza.job.JobCoordinatorMetadata} in coordinator stream.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: meta-key
+ *     Type: set-job-coordinator-metadata
+ *     Source: "SamzaContainer"
+ *     MessageMap:
+ *     {
+ *         "epoch-id": epoch identifier of the job coordinator,
+ *         "job-model-id": identifier associated with the snapshot of job model used by the job coordinator,
+ *         "config-id": identifier associated with the snapshot of the configuration used by the job coordinator
+ *     }
+ * }
+ * */
+public class SetJobCoordinatorMetadataMessage extends CoordinatorStreamMessage {
+  private static final String META_KEY = "meta-key";
+  public static final String TYPE = "set-job-coordinator-metadata";
+
+  public SetJobCoordinatorMetadataMessage(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  public SetJobCoordinatorMetadataMessage(String source, String clusterType, String metaMessage) {
+    super(source);
+    setType(TYPE);
+    setKey(clusterType);
+    putMessageValue(META_KEY, metaMessage);
+  }
+
+  public String getJobCoordinatorMetadata() {
+    return getMessageValue(META_KEY);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index a5148fb..459ad89 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -30,8 +30,8 @@
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
+import org.apache.samza.container.ExecutionContainerIdManager;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
@@ -39,9 +39,12 @@
 import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
@@ -97,7 +100,7 @@
       String jobName,
       String jobId,
       String containerId,
-      Optional<String> execEnvContainerId,
+      Optional<String> execEnvContainerIdOptional,
       JobModel jobModel,
       Config config,
       Optional<ExternalContext> externalContextOptional) {
@@ -117,7 +120,7 @@
       Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);
 
       // Creating diagnostics manager and reporter, and wiring it respectively
-      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config);
+      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerIdOptional, config);
       Option<DiagnosticsManager> diagnosticsManager = Option.empty();
       if (diagnosticsManagerReporterPair.isPresent()) {
         diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
@@ -142,11 +145,24 @@
           listener = new ClusterBasedProcessorLifecycleListener(config, processorLifecycleListener, container::shutdown);
       container.setContainerListener(listener);
 
-      ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
+      JobConfig jobConfig = new JobConfig(config);
+      ContainerHeartbeatMonitor heartbeatMonitor =
+          createContainerHeartbeatMonitor(container,
+              new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE),
+              jobConfig.getApplicationMasterHighAvailabilityEnabled(), jobConfig.getContainerHeartbeatRetryCount(),
+              jobConfig.getContainerHeartbeatRetrySleepDurationMs());
       if (heartbeatMonitor != null) {
         heartbeatMonitor.start();
       }
 
+      if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
+        execEnvContainerIdOptional.ifPresent(execEnvContainerId -> {
+          ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
+              new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetExecutionEnvContainerIdMapping.TYPE));
+          executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(containerId, execEnvContainerId);
+        });
+      }
+
       container.run();
       if (heartbeatMonitor != null) {
         heartbeatMonitor.stop();
@@ -189,9 +205,15 @@
   /**
    * Creates a new container heartbeat monitor if possible.
    * @param container the container to monitor
+   * @param coordinatorStreamStore the metadata store to fetch coordinator url from
+   * @param isApplicaitonMasterHighAvailabilityEnabled whether AM HA is enabled to fetch new AM url
+   * @param retryCount number of times to retry connecting to new AM when heartbeat expires
+   * @param sleepDurationForReconnectWithAM sleep duration between retries to connect to new AM when heartbeat expires
    * @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
    */
-  private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container) {
+  private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container,
+      MetadataStore coordinatorStreamStore, boolean isApplicaitonMasterHighAvailabilityEnabled, long retryCount,
+      long sleepDurationForReconnectWithAM) {
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
     String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
     if (executionEnvContainerId != null) {
@@ -204,7 +226,8 @@
           log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
           System.exit(1);
         }
-      }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
+      }, coordinatorUrl, executionEnvContainerId, coordinatorStreamStore, isApplicaitonMasterHighAvailabilityEnabled,
+          retryCount, sleepDurationForReconnectWithAM);
     } else {
       log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
       return null;
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 7687eb2..7b0a08f 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -431,25 +431,38 @@
       userDefinedProcessorLifecycleListener.afterStart();
     }
 
+    private void closeAndRemoveProcessor() {
+      processors.forEach(sp -> {
+        if (sp.getLeft().equals(processor)) {
+          sp.getLeft().stop();
+          if (sp.getRight() != null) {
+            sp.getRight().close();
+          }
+        }
+      });
+      processors.removeIf(pair -> pair.getLeft().equals(processor));
+    }
     @Override
     public void afterStop() {
-      processors.removeIf(pair -> pair.getLeft().equals(processor));
-
+      closeAndRemoveProcessor();
       // successful shutdown
       handleProcessorShutdown(null);
     }
 
     @Override
     public void afterFailure(Throwable t) {
-      processors.removeIf(pair -> pair.getLeft().equals(processor));
+      // we need to close associated coordinator metadata store, although the processor failed
+      closeAndRemoveProcessor();
 
       // the processor stopped with failure, this is logging the first processor's failure as the cause of
       // the whole application failure
       if (failure.compareAndSet(null, t)) {
         // shutdown the other processors
         processors.forEach(sp -> {
-          sp.getLeft().stop();    // Stop StreamProcessor
-          sp.getRight().close();  // Close associated coordinator metadata store
+          sp.getLeft().stop();     // Stop StreamProcessor
+          if (sp.getRight() != null) {
+            sp.getRight().close(); // Close associated coordinator metadata store
+          }
         });
       }
 
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobCoordinatorMetadataMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobCoordinatorMetadataMixIn.java
new file mode 100644
index 0000000..5aab8c1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobCoordinatorMetadataMixIn.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers.model;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A mix-in Jackson class to convert {@link org.apache.samza.job.JobCoordinatorMetadata} to/from JSON
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class JsonJobCoordinatorMetadataMixIn {
+
+  @JsonCreator
+  public JsonJobCoordinatorMetadataMixIn(@JsonProperty("epoch-id") String epochId,
+      @JsonProperty("config-id") String configId, @JsonProperty("job-model-id") String jobModelId) {
+  }
+
+  @JsonProperty("epoch-id")
+  abstract String getEpochId();
+
+  @JsonProperty("config-id")
+  abstract String getConfigId();
+
+  @JsonProperty("job-model-id")
+  abstract String getJobModelId();
+}
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index db147f0..3a0205d 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -27,6 +27,7 @@
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.job.JobCoordinatorMetadata;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
@@ -137,6 +138,12 @@
     mapper.getSerializationConfig().addMixInAnnotations(ProcessorLocality.class, JsonProcessorLocalityMixIn.class);
     mapper.getDeserializationConfig().addMixInAnnotations(ProcessorLocality.class, JsonProcessorLocalityMixIn.class);
 
+    // Register mixins for job coordinator metadata model
+    mapper.getSerializationConfig()
+        .addMixInAnnotations(JobCoordinatorMetadata.class, JsonJobCoordinatorMetadataMixIn.class);
+    mapper.getDeserializationConfig()
+        .addMixInAnnotations(JobCoordinatorMetadata.class, JsonJobCoordinatorMetadataMixIn.class);
+
     // Convert camel case to hyphenated field names, and register the module.
     mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
     mapper.registerModule(module);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
index 5eda128..1fb49a9 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
@@ -140,7 +140,10 @@
     SystemConfig systemConfig = new SystemConfig(config);
     storeNameSystemStreamMapping.forEach((storeName, systemStream) -> {
       // Load system admin for this system.
-      SystemAdmin systemAdmin = systemConfig.getSystemAdmin(systemStream.getSystem());
+      SystemAdmin systemAdmin = systemConfig
+          .getSystemFactories()
+          .get(systemStream.getSystem())
+          .getAdmin(systemStream.getSystem(), config, ChangelogStreamManager.class.getSimpleName());
 
       if (systemAdmin == null) {
         throw new SamzaException(String.format(
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 6224a3e..b1e4206 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -46,7 +46,6 @@
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
@@ -166,7 +165,6 @@
     }
     // if diagnostics is enabled, create diagnostics stream if it doesnt exist
 
-    SystemAdmins systemAdmins = new SystemAdmins(config, DiagnosticsUtil.class.getSimpleName());
     String diagnosticsSystemStreamName = new MetricsConfig(config)
         .getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)
         .orElseThrow(() -> new ConfigException("Missing required config: " +
@@ -174,7 +172,9 @@
                 MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)));
 
     SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName);
-    SystemAdmin diagnosticsSysAdmin = systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem());
+    SystemConfig systemConfig = new SystemConfig(config);
+    SystemAdmin diagnosticsSysAdmin = systemConfig.getSystemFactories().get(diagnosticsSystemStream.getSystem())
+        .getAdmin(diagnosticsSystemStream.getSystem(), config, DiagnosticsUtil.class.getSimpleName());
     StreamSpec diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, diagnosticsSystemStream.getStream(),
         diagnosticsSystemStream.getSystem(), new StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID));
 
diff --git a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index 93536fa..a5acdca 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -99,7 +99,9 @@
       .getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
 
     var systemStreams = Seq.empty[String]
-    val systemAdmin = new SystemConfig(config).getSystemAdmin(systemName, this.getClass.getSimpleName)
+    val systemConfig = new SystemConfig(config)
+    val systemAdmin = systemConfig.getSystemFactories
+      .get(systemName).getAdmin(systemName, config, this.getClass.getSimpleName)
     try {
       systemAdmin.start()
       systemStreams =
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 91fec28..96588c9 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -64,4 +64,10 @@
 
   val mContainerMemoryMb = newGauge("container-memory-mb", () => clusterManagerConfig.getContainerMemoryMb)
   val mContainerCpuCores = newGauge("container-cpu-cores", () => clusterManagerConfig.getNumCores)
+
+  val mFaultDomainAwareContainerRequests = newGauge("fault-domain-aware-container-requests", () => state.faultDomainAwareContainerRequests.get())
+  val mFaultDomainAwareContainersStarted = newGauge("fault-domain-aware-containers-started", () => state.faultDomainAwareContainersStarted.get())
+  val mExpiredFaultDomainAwareContainerRequests = newGauge("expired-fault-domain-aware-container-requests", () => state.expiredFaultDomainAwareContainerRequests.get())
+  val mFailedFaultDomainAwareContainerRequests = newGauge("failed-fault-domain-aware-container-requests", () => state.failedFaultDomainAwareContainerAllocations.get())
+
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 254f86d..0462663 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -24,6 +24,7 @@
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config._
+import org.apache.samza.coordinator.CoordinationConstants
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
 import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamValueSerde}
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
@@ -54,11 +55,12 @@
    * @param config to create coordinator stream.
    */
   def createCoordinatorStream(config: Config): Unit = {
-    val systemAdmins = new SystemAdmins(config, this.getClass.getSimpleName)
-
     info("Creating coordinator stream")
     val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
-    val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
+    val systemConfig = new SystemConfig(config)
+    val coordinatorSystemAdmin = systemConfig.getSystemFactories.get(coordinatorSystemStream.getSystem)
+      .getAdmin(coordinatorSystemStream.getSystem, config, classOf[DiagnosticsUtil].getSimpleName)
+
     coordinatorSystemAdmin.start()
     CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
     coordinatorSystemAdmin.stop()
@@ -197,6 +199,11 @@
         keysToRemove = keysToRemove.filter(configKey => !JobConfig.isAutosizingConfig(configKey))
       }
 
+      if (jobConfig.getApplicationMasterHighAvailabilityEnabled) {
+        // if AM HA is enabled then retain AM url as running containers are fetching it from c-stream until new AM publishes new AM url.
+        keysToRemove = keysToRemove.filter(configKey => !(configKey.equals(CoordinationConstants.YARN_COORDINATOR_URL)))
+      }
+
       info("Deleting old configs that are no longer defined: %s".format(keysToRemove))
       keysToRemove.foreach(key => { coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) })
     }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
index 7448e57..a65f6ff 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
@@ -18,17 +18,35 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.config.Config;
 
 import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.MapConfig;
+
 
 public class MockContainerAllocatorWithoutHostAffinity extends ContainerAllocator {
   public int requestedContainers = 0;
   private Semaphore semaphore = new Semaphore(0);
 
+  private Semaphore expiredRequestSemaphore = new Semaphore(0);
+  private AtomicInteger expiredRequestCallCount = new AtomicInteger(0);
+  private volatile boolean overrideIsRequestExpired = false;
+
+  // Create a MockContainerAllocator with certain config overrides
+  public static MockContainerAllocatorWithoutHostAffinity createContainerAllocatorWithConfigOverride(
+      ClusterResourceManager resourceManager, Config config, SamzaApplicationState state,
+      ContainerManager containerManager, Config overrideConfig) {
+    Map<String, String> mergedConfig = new HashMap<>();
+    mergedConfig.putAll(config);
+    mergedConfig.putAll(overrideConfig);
+    return new MockContainerAllocatorWithoutHostAffinity(resourceManager, new MapConfig(mergedConfig), state, containerManager);
+  }
+
   public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager resourceManager,
                                 Config config, SamzaApplicationState state, ContainerManager containerManager) {
     super(resourceManager, config, state, false, containerManager);
@@ -54,6 +72,29 @@
     super.requestResources(processorToHostMapping);
   }
 
+  public void setOverrideIsRequestExpired() {
+    overrideIsRequestExpired = true;
+  }
+
+  public int getExpiredRequestCallCount() {
+    return expiredRequestCallCount.get();
+  }
+
+  @Override
+  protected boolean isRequestExpired(SamzaResourceRequest request) {
+    if (!overrideIsRequestExpired) {
+      // if not set to override, then return the original result
+      return super.isRequestExpired(request);
+    }
+    expiredRequestSemaphore.release();
+    expiredRequestCallCount.incrementAndGet();
+    return true;
+  }
+
+  public boolean awaitIsRequestExpiredCall(long timeoutMs) throws InterruptedException {
+    return expiredRequestSemaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
+  }
+
   public ResourceRequestState getContainerRequestState() throws Exception {
     Field field = ContainerAllocator.class.getDeclaredField("resourceRequestState");
     field.setAccessible(true);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManager.java
new file mode 100644
index 0000000..0c3b455
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManager.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public class MockFaultDomainManager implements FaultDomainManager {
+
+  private final Multimap<String, FaultDomain> hostToFaultDomainMap;
+
+  public MockFaultDomainManager(MetricsRegistry metricsRegistry) {
+    FaultDomain faultDomain1 = new FaultDomain(FaultDomainType.RACK, "rack-1");
+    FaultDomain faultDomain2 = new FaultDomain(FaultDomainType.RACK, "rack-2");
+    FaultDomain faultDomain3 = new FaultDomain(FaultDomainType.RACK, "rack-1");
+    FaultDomain faultDomain4 = new FaultDomain(FaultDomainType.RACK, "rack-2");
+    FaultDomain faultDomain5 = new FaultDomain(FaultDomainType.RACK, "rack-3");
+    hostToFaultDomainMap = HashMultimap.create();
+    hostToFaultDomainMap.put("host-1", faultDomain1);
+    hostToFaultDomainMap.put("host-2", faultDomain2);
+    hostToFaultDomainMap.put("host-3", faultDomain3);
+    hostToFaultDomainMap.put("host-4", faultDomain4);
+    hostToFaultDomainMap.put("host-5", faultDomain5);
+  }
+
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return new HashSet<>(hostToFaultDomainMap.values());
+  }
+
+  @Override
+  public Set<FaultDomain> getFaultDomainsForHost(String host) {
+    return new HashSet<>(hostToFaultDomainMap.get(host));
+  }
+
+  @Override
+  public boolean hasSameFaultDomains(String host1, String host2) {
+    return hostToFaultDomainMap.get(host1).equals(hostToFaultDomainMap.get(host2));
+  }
+
+}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManagerFactory.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManagerFactory.java
new file mode 100644
index 0000000..4dc4dce
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManagerFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public class MockFaultDomainManagerFactory implements FaultDomainManagerFactory {
+
+  @Override
+  public FaultDomainManager getFaultDomainManager(Config config, MetricsRegistry metricsRegistry) {
+    return new MockFaultDomainManager(metricsRegistry);
+  }
+
+}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index caa1ffe..e0b0739 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -32,11 +32,14 @@
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.JobCoordinatorMetadataManager;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
 import org.apache.samza.execution.RemoteJobPlanner;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.system.MockSystemFactory;
@@ -47,7 +50,6 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
 import org.mockito.exceptions.base.MockitoException;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -58,6 +60,8 @@
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -84,6 +88,7 @@
     configMap.put("task.inputs", "kafka.topic1");
     configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.MockSystemFactory");
     configMap.put("samza.cluster-manager.factory", "org.apache.samza.clustermanager.MockClusterResourceManagerFactory");
+    configMap.put("cluster-manager.fault-domain-manager.factory", "org.apache.samza.clustermanager.MockFaultDomainManagerFactory");
     configMap.put("job.coordinator.monitor-partition-change.frequency.ms", "1");
 
     MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(0)), new ArrayList<>());
@@ -150,7 +155,7 @@
     Config config = new MapConfig(configMap);
     MockitoException stopException = new MockitoException("Stop");
 
-    ClusterBasedJobCoordinator clusterCoordinator = Mockito.spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(config));
+    ClusterBasedJobCoordinator clusterCoordinator = spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(config));
     ContainerProcessManager mockContainerProcessManager = mock(ContainerProcessManager.class);
     doReturn(true).when(mockContainerProcessManager).shouldShutdown();
     StartpointManager mockStartpointManager = mock(StartpointManager.class);
@@ -174,6 +179,43 @@
   }
 
   @Test
+  public void testVerifyShouldFanoutStartpointWithoutAMHA() {
+    Config jobConfig = new MapConfig(configMap);
+
+    when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(jobConfig);
+    ClusterBasedJobCoordinator clusterBasedJobCoordinator =
+        spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(jobConfig));
+
+    when(clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts()).thenReturn(true);
+    assertTrue("Startpoint should fanout even if metadata changed",
+        clusterBasedJobCoordinator.shouldFanoutStartpoint());
+
+    when(clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts()).thenReturn(false);
+    assertTrue("Startpoint should fanout even if metadata remains unchanged",
+        clusterBasedJobCoordinator.shouldFanoutStartpoint());
+  }
+
+  @Test
+  public void testVerifyShouldFanoutStartpointWithAMHA() {
+    Config jobConfig = new MapConfig(configMap);
+
+    when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(jobConfig);
+    ClusterBasedJobCoordinator clusterBasedJobCoordinator =
+        spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(jobConfig));
+
+    when(clusterBasedJobCoordinator.isApplicationMasterHighAvailabilityEnabled()).thenReturn(true);
+
+    when(clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts()).thenReturn(true);
+    assertTrue("Startpoint should fanout with change in metadata",
+        clusterBasedJobCoordinator.shouldFanoutStartpoint());
+
+    when(clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts()).thenReturn(false);
+    assertFalse("Startpoint fan out shouldn't happen when metadata is unchanged",
+        clusterBasedJobCoordinator.shouldFanoutStartpoint());
+
+  }
+
+  @Test
   public void testToArgs() {
     ApplicationConfig appConfig = new ApplicationConfig(new MapConfig(ImmutableMap.of(
         JobConfig.JOB_NAME, "test1",
@@ -192,4 +234,39 @@
     assertEquals(expected.size(), actual.size());
     assertTrue(actual.containsAll(expected));
   }
+
+  @Test
+  public void testGenerateAndUpdateJobCoordinatorMetadata() {
+    Config jobConfig = new MapConfig(configMap);
+    when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(jobConfig);
+    ClusterBasedJobCoordinator clusterBasedJobCoordinator =
+        spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(jobConfig));
+
+    JobCoordinatorMetadata previousMetadata = mock(JobCoordinatorMetadata.class);
+    JobCoordinatorMetadata newMetadata = mock(JobCoordinatorMetadata.class);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = mock(JobCoordinatorMetadataManager.class);
+    JobModel mockJobModel = mock(JobModel.class);
+
+    when(jobCoordinatorMetadataManager.readJobCoordinatorMetadata()).thenReturn(previousMetadata);
+    when(jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(any(), any())).thenReturn(newMetadata);
+    when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata)).thenReturn(false);
+    when(clusterBasedJobCoordinator.createJobCoordinatorMetadataManager()).thenReturn(jobCoordinatorMetadataManager);
+
+    /*
+     * Verify if there are no changes to metadata, the metadata changed flag remains false and no interactions
+     * with job coordinator metadata manager
+     */
+    clusterBasedJobCoordinator.generateAndUpdateJobCoordinatorMetadata(mockJobModel);
+    assertFalse("JC metadata changed should remain unchanged",
+        clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts());
+    verify(jobCoordinatorMetadataManager, times(0)).writeJobCoordinatorMetadata(any());
+
+    /*
+     * Verify if there are changes to metadata, we persist the new metadata & update the metadata changed flag
+     */
+    when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata)).thenReturn(true);
+    clusterBasedJobCoordinator.generateAndUpdateJobCoordinatorMetadata(mockJobModel);
+    assertTrue("JC metadata changed should be true", clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts());
+    verify(jobCoordinatorMetadataManager, times(1)).writeJobCoordinatorMetadata(newMetadata);
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 2b4a4b0..2c9ba81 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -64,6 +64,7 @@
   private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
 
   private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+  private final FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
   private ContainerPlacementMetadataStore containerPlacementMetadataStore;
   private ContainerManager containerManager;
 
@@ -89,7 +90,7 @@
     coordinatorStreamStore.init();
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
-    containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
+    containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager, faultDomainManager, config);
     containerAllocator =
         new ContainerAllocator(clusterResourceManager, config, state, true, containerManager);
     requestState = new MockContainerRequestState(clusterResourceManager, true);
@@ -369,7 +370,7 @@
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
     ClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class));
+        new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class), faultDomainManager, config);
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
       SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
@@ -416,7 +417,7 @@
   public void testExpiredRequestAllocationOnAnyHost() throws Exception {
     MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false, mock(LocalityManager.class), faultDomainManager, config));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyManager, config, state, true, spyContainerManager));
     // Request Preferred Resources
@@ -460,7 +461,7 @@
     // Add Extra Resources
     MockClusterResourceManager spyClusterResourceManager = spy(new MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false, mock(LocalityManager.class), faultDomainManager, config));
 
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyClusterResourceManager, config, state, true, spyContainerManager));
@@ -513,7 +514,7 @@
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
     MockClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class), faultDomainManager, config));
 
     SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, "host-0", "id0",
         System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index b808296..1f063d7 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -61,6 +62,7 @@
 
   private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
   private final MockClusterResourceManager manager = new MockClusterResourceManager(callback, state);
+  private final FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
 
   private CoordinatorStreamStore coordinatorStreamStore;
   private ContainerPlacementMetadataStore containerPlacementMetadataStore;
@@ -82,7 +84,7 @@
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
     containerAllocator = new ContainerAllocator(manager, config, state, false,
-        new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager));
+        new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager, faultDomainManager, config));
     requestState = new MockContainerRequestState(manager, false);
     Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -165,6 +167,58 @@
   }
 
   /**
+   * See SAMZA-2601: we want to prevent an infinite loop in the case of expired request call with host affinity
+   * disabled. This test make sure we don't have that infinite loop.
+   */
+  @Test
+  public void testExpiredRequestInfiniteLoop() throws Exception {
+    Config override = new MapConfig(new HashMap<String, String>() {
+      {
+        // override to have a proper sleep interval for this test
+        put("cluster-manager.allocator.sleep.ms", "100");
+      }
+    });
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    ContainerManager containerManager = new ContainerManager(containerPlacementMetadataStore, state, manager, false,
+        false, mockLocalityManager, faultDomainManager, config);
+    containerAllocator =
+        MockContainerAllocatorWithoutHostAffinity.createContainerAllocatorWithConfigOverride(manager, config, state,
+            containerManager,
+            override);
+    MockContainerAllocatorWithoutHostAffinity mockAllocator =
+        (MockContainerAllocatorWithoutHostAffinity) containerAllocator;
+    mockAllocator.setOverrideIsRequestExpired();
+    allocatorThread = new Thread(containerAllocator);
+
+    Map<String, String> containersToHostMapping = new HashMap<String, String>() {
+      {
+        put("0", null);
+        put("1", null);
+        put("2", null);
+        put("3", null);
+      }
+    };
+
+    allocatorThread.start();
+
+    mockAllocator.requestResources(containersToHostMapping);
+    // Wait for at least one expired request call is made, which should happen.
+    // If the test passes, this should return immediately (within 100 ms). Only when the test fails will it exhaust the
+    // timeout, which is worth the wait to find out the failure
+    assertTrue(mockAllocator.awaitIsRequestExpiredCall(TimeUnit.SECONDS.toMillis(10)));
+    // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+    // a Clock which can be simulated and controlled.
+    Thread.sleep(500);
+    // Given that we wait for 500 ms above, and a sleep interval of 100 ms, we should roughly see 5 times the
+    // isRequestExpired is called. We give some extra buffer here (<100). Because if we do run into infinite loop,
+    // isRequestExpired would be called MILLIONS of times (4~5 million times after a dozen of runs on my machine).
+    assertTrue(
+        String.format("Too many call count: %d. Seems to be in infinite loop", mockAllocator.getExpiredRequestCallCount()),
+        mockAllocator.getExpiredRequestCallCount() < 100);
+  }
+
+  /**
    * Test requestContainers with containerToHostMapping with host.affinity disabled
    */
   @Test
@@ -280,7 +334,7 @@
     ClusterResourceManager.Callback mockCPM = mock(ClusterResourceManager.Callback.class);
     ClusterResourceManager mockManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, mockManager, false, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, mockManager, false, false, mock(LocalityManager.class), faultDomainManager, config));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(mockManager, config, state, false, spyContainerManager));
     // Mock the callback from ClusterManager to add resources to the allocator
@@ -294,6 +348,8 @@
     spyThread = new Thread(spyAllocator, "Container Allocator Thread");
     // Start the container allocator thread periodic assignment
     spyThread.start();
+    // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+    // a Clock which can be simulated and controlled.
     Thread.sleep(1000);
     // Verify that all the request that were created were "ANY_HOST" requests
     ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 53bd5b0..e5ead9e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -141,16 +141,17 @@
     state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     localityManager = mock(LocalityManager.class);
     when(localityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of(
             "0", new ProcessorLocality("0", "host-1"),
             "1", new ProcessorLocality("1", "host-2"))));
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager, faultDomainManager, config));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
-            clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager);
+            clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager, false);
   }
 
   @After
@@ -171,12 +172,13 @@
     state = new SamzaApplicationState(getJobModelManagerWithStandby());
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     // Enable standby
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, true, mockLocalityManager));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, true, mockLocalityManager, faultDomainManager, config));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
-        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, mockLocalityManager);
+        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, mockLocalityManager, false);
   }
 
   @Test(timeout = 10000)
@@ -552,11 +554,12 @@
     state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager, faultDomainManager, config));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
-        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager);
+        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager, false);
 
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
@@ -666,15 +669,16 @@
         new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, false, false, localityManager);
+        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, false, false, localityManager, faultDomainManager, config);
     MockContainerAllocatorWithoutHostAffinity allocatorWithoutHostAffinity =
         new MockContainerAllocatorWithoutHostAffinity(clusterResourceManager, new MapConfig(conf), state,
             containerManager);
 
     ContainerProcessManager cpm = new ContainerProcessManager(
         new ClusterManagerConfig(new MapConfig(getConfig(), getConfigWithHostAffinityAndRetries(false, 1, true))), state,
-        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithoutHostAffinity), containerManager, localityManager);
+        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithoutHostAffinity), containerManager, localityManager, false);
 
     // Mimic Cluster Manager returning any request
     doAnswer(new Answer<Void>() {
@@ -801,13 +805,14 @@
         new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ContainerManager containerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager, faultDomainManager, config));
     MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
         new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     ContainerProcessManager cpm = new ContainerProcessManager(
         new ClusterManagerConfig(new MapConfig(getConfig(), getConfigWithHostAffinityAndRetries(true, 1, true))), state,
-        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager);
+        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager, false);
 
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 5e550cf..ad45c5e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -31,6 +31,7 @@
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.JobModelManager;
@@ -140,11 +141,12 @@
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"))));
     ContainerManager containerManager =
-        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager, faultDomainManager);
     ContainerProcessManager cpm =
         buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());
 
@@ -170,7 +172,8 @@
         clusterResourceManager,
         Optional.empty(),
         containerManager,
-        mockLocalityManager
+        mockLocalityManager,
+        false
     );
 
     allocator =
@@ -227,6 +230,107 @@
   }
 
   @Test
+  public void testOnInitAMHighAvailability() throws Exception {
+    Map<String, String> configMap = new HashMap<>(configVals);
+    configMap.put(JobConfig.YARN_AM_HIGH_AVAILABILITY_ENABLED, "true");
+    Config conf = new MapConfig(configMap);
+
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(2));
+    state.runningProcessors.put("0", new SamzaResource(1, 1024, "host", "0"));
+
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    ClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
+    ContainerManager containerManager =
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
+
+    ContainerProcessManager cpm =
+        buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());
+
+    MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
+        clusterResourceManager,
+        conf,
+        state,
+        containerManager);
+
+    getPrivateFieldFromCpm("containerAllocator", cpm).set(cpm, allocator);
+    CountDownLatch latch = new CountDownLatch(1);
+    getPrivateFieldFromCpm("allocatorThread", cpm).set(cpm, new Thread() {
+      public void run() {
+        isRunning = true;
+        latch.countDown();
+      }
+    });
+
+    cpm.start();
+
+    if (!latch.await(2, TimeUnit.SECONDS)) {
+      Assert.fail("timed out waiting for the latch to expire");
+    }
+
+    // Verify Allocator thread has started running
+    assertTrue(isRunning);
+
+    // Verify only 1 was requested with allocator
+    assertEquals(1, allocator.requestedContainers);
+    assertTrue("Ensure no processors were forcefully restarted", callback.resourceStatuses.isEmpty());
+
+    cpm.stop();
+  }
+
+  @Test
+  public void testOnInitToForceRestartAMHighAvailability() throws Exception {
+    Map<String, String> configMap = new HashMap<>(configVals);
+    configMap.put(JobConfig.YARN_AM_HIGH_AVAILABILITY_ENABLED, "true");
+    Config conf = new MapConfig(configMap);
+    SamzaResource samzaResource = new SamzaResource(1, 1024, "host", "0");
+
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(2));
+    state.runningProcessors.put("0", samzaResource);
+
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    ClusterResourceManager clusterResourceManager = spy(new MockClusterResourceManager(callback, state));
+    ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
+    ContainerManager containerManager =
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
+
+    ContainerProcessManager cpm =
+        buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty(), true);
+
+    MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
+        clusterResourceManager,
+        conf,
+        state,
+        containerManager);
+
+    getPrivateFieldFromCpm("containerAllocator", cpm).set(cpm, allocator);
+    CountDownLatch latch = new CountDownLatch(1);
+    getPrivateFieldFromCpm("allocatorThread", cpm).set(cpm, new Thread() {
+      public void run() {
+        isRunning = true;
+        latch.countDown();
+      }
+    });
+
+    cpm.start();
+
+    if (!latch.await(2, TimeUnit.SECONDS)) {
+      Assert.fail("timed out waiting for the latch to expire");
+    }
+
+    verify(clusterResourceManager, times(1)).stopStreamProcessor(samzaResource);
+    assertEquals("CPM should stop the running container", 1, callback.resourceStatuses.size());
+
+    SamzaResourceStatus actualResourceStatus = callback.resourceStatuses.get(0);
+    assertEquals("Container 0 should be stopped", "0", actualResourceStatus.getContainerId());
+    assertEquals("Container 0 should have exited with preempted status", SamzaResourceStatus.PREEMPTED,
+        actualResourceStatus.getExitCode());
+    cpm.stop();
+  }
+
+  @Test
   public void testOnShutdown() throws Exception {
     Config conf = getConfig();
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
@@ -489,6 +593,7 @@
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
 
     if (withHostAffinity) {
@@ -501,7 +606,7 @@
 
     ContainerManager containerManager =
         buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-            clusterManagerConfig.getHostAffinityEnabled(), false, mockLocalityManager);
+            clusterManagerConfig.getHostAffinityEnabled(), false, mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -510,7 +615,8 @@
         containerManager);
 
     ContainerProcessManager cpm =
-        buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator), mockLocalityManager);
+        buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator),
+            mockLocalityManager, false, faultDomainManager);
 
     // start triggers a request
     cpm.start();
@@ -651,11 +757,12 @@
     configMap.putAll(getConfig());
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("1", "host1"))));
     ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager);
+            Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -665,7 +772,7 @@
 
     ContainerProcessManager manager =
         new ContainerProcessManager(new ClusterManagerConfig(config), state, new MetricsRegistryMap(), clusterResourceManager,
-            Optional.of(allocator), containerManager, mockLocalityManager);
+            Optional.of(allocator), containerManager, mockLocalityManager, false);
 
     manager.start();
     SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1");
@@ -688,11 +795,12 @@
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(2));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"), "1", new ProcessorLocality("1", "host2"))));
     ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager);
+            Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithHostAffinity allocator = new MockContainerAllocatorWithHostAffinity(
         clusterResourceManager,
@@ -701,7 +809,8 @@
         containerManager);
 
     ContainerProcessManager cpm =
-        spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator), mockLocalityManager));
+        spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager,
+            Optional.of(allocator), mockLocalityManager, false, faultDomainManager));
 
     cpm.start();
     assertFalse(cpm.shouldShutdown());
@@ -926,28 +1035,36 @@
       SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
       boolean hostAffinityEnabled, boolean standByEnabled) {
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
     return buildContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager,
-        hostAffinityEnabled, standByEnabled, mockLocalityManager);
+        hostAffinityEnabled, standByEnabled, mockLocalityManager, faultDomainManager);
   }
 
   private ContainerManager buildContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
-      SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
-    return new ContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager,
-        hostAffinityEnabled, standByEnabled, localityManager);
+      SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager, boolean hostAffinityEnabled,
+      boolean standByEnabled, LocalityManager localityManager, FaultDomainManager faultDomainManager) {
+    return new ContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager, hostAffinityEnabled, standByEnabled, localityManager, faultDomainManager, config);
   }
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator) {
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
-    return buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, allocator, mockLocalityManager);
+    return buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, allocator, false);
   }
 
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
-      ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator, LocalityManager localityManager) {
+      ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator, boolean restartContainer) {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    return buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, allocator,
+        mockLocalityManager, restartContainer, faultDomainManager);
+  }
+
+  private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
+      ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator, LocalityManager localityManager,
+      boolean restartContainers, FaultDomainManager faultDomainManager) {
     return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager,
         allocator, buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        clusterManagerConfig.getHostAffinityEnabled(), false, localityManager), localityManager);
+        clusterManagerConfig.getHostAffinityEnabled(), false, localityManager, faultDomainManager), localityManager, restartContainers);
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
index 2445c00..5c0bf16 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -22,6 +22,10 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -33,6 +37,7 @@
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -42,6 +47,8 @@
   private Runnable onExpired;
   @Mock
   private ContainerHeartbeatClient containerHeartbeatClient;
+  @Mock
+  private MetadataStore coordinatorStreamStore;
 
   private ScheduledExecutorService scheduler;
   /**
@@ -51,13 +58,18 @@
 
   private ContainerHeartbeatMonitor containerHeartbeatMonitor;
 
+  private static final String COORDINATOR_URL = "http://some-host.prod.linkedin.com";
+  private static final ContainerHeartbeatResponse FAILURE_RESPONSE = new ContainerHeartbeatResponse(false);
+  private static final ContainerHeartbeatResponse SUCCESS_RESPONSE = new ContainerHeartbeatResponse(true);
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
     this.schedulerFixedRateExecutionLatch = new CountDownLatch(1);
     this.scheduler = buildScheduledExecutorService(this.schedulerFixedRateExecutionLatch);
     this.containerHeartbeatMonitor =
-        new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler);
+        new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, false, 5, 10);
   }
 
   @Test
@@ -93,6 +105,99 @@
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE).thenReturn(SUCCESS_RESPONSE);
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should not have been submitted
+    verify(this.scheduler, never()).schedule(any(Runnable.class), anyLong(), any());
+    verify(this.onExpired, never()).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
+
+  @Test
+  public void testFailedToFetchNewAMCoordinatorUrl() throws InterruptedException {
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(COORDINATOR_URL));
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should have been submitted
+    verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
+
+  @Test
+  public void testConnectToNewAMFailed() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should have been submitted
+    verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
+
+  @Test
+  public void testConnectToNewAMSerdeException() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenThrow(new NullPointerException("serde failed"));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should have been submitted
+    verify(this.scheduler).schedule(any(Runnable.class), eq(0L), eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
   /**
    * Build a mock {@link ScheduledExecutorService} which will execute a fixed-rate task once. It will count down on
    * {@code schedulerFixedRateExecutionLatch} when the task is finished executing.
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java b/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java
new file mode 100644
index 0000000..2ecd88c
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestExecutionContainerIdManager {
+
+  private static final Config
+      CONFIG = new MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", "test-kafka"));
+
+  private CoordinatorStreamStore coordinatorStreamStore;
+  private CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil;
+  private MetadataStore store;
+  private ExecutionContainerIdManager executionContainerIdManager;
+
+  @Before
+  public void setup() {
+    coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(CONFIG);
+    coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    store = Mockito.spy(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
+        SetExecutionEnvContainerIdMapping.TYPE));
+    executionContainerIdManager = new ExecutionContainerIdManager(store);
+
+  }
+
+  @After
+  public void tearDown() {
+    MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
+  }
+
+  @Test
+  public void testExecutionContainerIdManager() {
+    String physicalId = "container_123_123_123";
+    String processorId = "0";
+
+    executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId);
+    Map<String, String> localMap = executionContainerIdManager.readExecutionEnvironmentContainerIdMapping();
+
+    Map<String, String> expectedMap = ImmutableMap.of(processorId, physicalId);
+    assertEquals(expectedMap, localMap);
+
+    executionContainerIdManager.close();
+
+    MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer producer = coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemProducer();
+    MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer consumer = coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemConsumer();
+    assertTrue(producer.isStopped());
+    assertTrue(consumer.isStopped());
+
+    ArgumentCaptor<byte[]> argument1 = ArgumentCaptor.forClass(byte[].class);
+    Mockito.verify(store).put(Mockito.eq(processorId), argument1.capture());
+    CoordinatorStreamValueSerde valueSerde = new CoordinatorStreamValueSerde(SetExecutionEnvContainerIdMapping.TYPE);
+    assertEquals(physicalId, valueSerde.fromBytes(argument1.getValue()));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testInvalidKeyExecutionContainerIdManager() {
+    String physicalId = "container_123_123_123";
+    String processorId = null;
+    executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId);
+  }
+  @Test(expected = NullPointerException.class)
+  public void testInvalidValueExecutionContainerIdManager() {
+    String physicalId = null;
+    String processorId = "0";
+    executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId);
+  }
+}
+
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
new file mode 100644
index 0000000..70e65a3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.Serde;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.samza.coordinator.JobCoordinatorMetadataManager.ClusterType;
+import static org.apache.samza.coordinator.JobCoordinatorMetadataManager.CONTAINER_ID_DELIMITER;
+import static org.apache.samza.coordinator.JobCoordinatorMetadataManager.CONTAINER_ID_PROPERTY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * A test class for {@link JobCoordinatorMetadataManager}
+ */
+public class TestJobCoordinatorMetadataManager {
+  private static final String OLD_CONFIG_ID = "1";
+  private static final String OLD_JOB_MODEL_ID = "1";
+  private static final String OLD_EPOCH_ID = "1606797336059" + CONTAINER_ID_DELIMITER + "0010";
+  private static final String OLD_CONTAINER_ID = "CONTAINER" + CONTAINER_ID_DELIMITER + OLD_EPOCH_ID +
+      CONTAINER_ID_DELIMITER + "00002";
+
+  private static final String NEW_CONFIG_ID = "2";
+  private static final String NEW_JOB_MODEL_ID = "2";
+  private static final String NEW_EPOCH_ID = "1606797336059" + CONTAINER_ID_DELIMITER + "0011";
+
+  private static final Config OLD_CONFIG = new MapConfig(
+      ImmutableMap.of(
+          "job.autosizing.enabled", "true",
+          "job.autosizing.cpu.core", "16"));
+
+  private static final Config NEW_CONFIG = new MapConfig(
+      ImmutableMap.of(
+          "job.autosizing.enabled", "true",
+          "job.autosizing.cpu.core", "24"));
+
+  private static final Config COORDINATOR_STORE_CONFIG =
+      new MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", "test-kafka"));
+
+  private JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  private Map<String, ContainerModel> containerModelMap;
+  private MetadataStore metadataStore;
+
+  @Before
+  public void setup() {
+    Map<TaskName, TaskModel> tasksForContainer1 = ImmutableMap.of(
+        new TaskName("t1"), new TaskModel(new TaskName("t1"), ImmutableSet.of(), new Partition(0)),
+        new TaskName("t2"), new TaskModel(new TaskName("t2"), ImmutableSet.of(), new Partition(1)));
+    Map<TaskName, TaskModel> tasksForContainer2 = ImmutableMap.of(
+        new TaskName("t3"), new TaskModel(new TaskName("t3"), ImmutableSet.of(), new Partition(2)),
+        new TaskName("t4"), new TaskModel(new TaskName("t4"), ImmutableSet.of(), new Partition(3)),
+        new TaskName("t5"), new TaskModel(new TaskName("t5"), ImmutableSet.of(), new Partition(4)));
+    ContainerModel containerModel1 = new ContainerModel("0", tasksForContainer1);
+    ContainerModel containerModel2 = new ContainerModel("1", tasksForContainer2);
+    containerModelMap = ImmutableMap.of("0", containerModel1, "1", containerModel2);
+    CoordinatorStreamStoreTestUtil mockCoordinatorStreamStore =
+        new CoordinatorStreamStoreTestUtil(COORDINATOR_STORE_CONFIG);
+    metadataStore = spy(new NamespaceAwareCoordinatorStreamStore(
+        mockCoordinatorStreamStore.getCoordinatorStreamStore(), SetJobCoordinatorMetadataMessage.TYPE));
+    jobCoordinatorMetadataManager = spy(new JobCoordinatorMetadataManager(metadataStore,
+        ClusterType.YARN, new MetricsRegistryMap()));
+  }
+
+  @Test
+  public void testCheckForMetadataChanges() {
+    JobCoordinatorMetadata previousMetadata = new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+    JobCoordinatorMetadata newMetadataWithDifferentEpochId =
+        new JobCoordinatorMetadata(NEW_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+
+    boolean metadataChanged =
+        jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithDifferentEpochId);
+    assertTrue("Metadata check should return true", metadataChanged);
+    assertEquals("New deployment should be 1 since Epoch ID changed", 1,
+        jobCoordinatorMetadataManager.getNewDeployment().getValue().intValue());
+
+    JobCoordinatorMetadata newMetadataWithDifferentConfigId =
+        new JobCoordinatorMetadata(OLD_EPOCH_ID, NEW_CONFIG_ID, OLD_JOB_MODEL_ID);
+    metadataChanged =
+        jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithDifferentConfigId);
+    assertTrue("Metadata check should return true", metadataChanged);
+    assertEquals("Config across application attempts should be 1", 1,
+        jobCoordinatorMetadataManager.getConfigChangedAcrossApplicationAttempt().getValue().intValue());
+
+    JobCoordinatorMetadata newMetadataWithDifferentJobModelId =
+        new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, NEW_JOB_MODEL_ID);
+    metadataChanged =
+        jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithDifferentJobModelId);
+    assertTrue("Metadata check should return true", metadataChanged);
+    assertEquals("Job model changed across application attempts should be 1", 1,
+        jobCoordinatorMetadataManager.getJobModelChangedAcrossApplicationAttempt().getValue().intValue());
+
+    JobCoordinatorMetadata newMetadataWithNoChange =
+        new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+    assertEquals("Application attempt count should be 0", 0,
+        jobCoordinatorMetadataManager.getApplicationAttemptCount().getCount());
+
+    metadataChanged =
+        jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithNoChange);
+    assertFalse("Metadata check should return false", metadataChanged);
+    assertEquals("Application attempt count should be 1", 1,
+        jobCoordinatorMetadataManager.getApplicationAttemptCount().getCount());
+  }
+
+  @Test
+  public void testGenerateJobCoordinatorMetadataFailed() {
+    doThrow(new RuntimeException("Failed to generate epoch id"))
+        .when(jobCoordinatorMetadataManager).fetchEpochIdForJobCoordinator();
+
+    try {
+      jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(new JobModel(OLD_CONFIG, containerModelMap), OLD_CONFIG);
+      fail("Expected generate job coordinator metadata to throw exception");
+    } catch (Exception e) {
+      assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
+      assertEquals("Metadata generation failed count should be 1", 1,
+          jobCoordinatorMetadataManager.getMetadataGenerationFailedCount().getCount());
+    }
+  }
+
+  @Test
+  public void testGenerateJobCoordinatorMetadataForRepeatability() {
+    when(jobCoordinatorMetadataManager.getEnvProperty(CONTAINER_ID_PROPERTY))
+        .thenReturn(OLD_CONTAINER_ID);
+    JobCoordinatorMetadata expectedMetadata = jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(
+        new JobModel(OLD_CONFIG, containerModelMap), OLD_CONFIG);
+
+    assertEquals("Mismatch in epoch identifier.", OLD_EPOCH_ID, expectedMetadata.getEpochId());
+
+    JobCoordinatorMetadata actualMetadata = jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(
+        new JobModel(OLD_CONFIG, containerModelMap), OLD_CONFIG);
+    assertEquals("Expected repeatable job coordinator metadata", expectedMetadata, actualMetadata);
+  }
+
+  @Test
+  public void testGenerateJobCoordinatorMetadataWithConfigChanges() {
+    when(jobCoordinatorMetadataManager.getEnvProperty(CONTAINER_ID_PROPERTY))
+        .thenReturn(OLD_CONTAINER_ID);
+    JobCoordinatorMetadata expectedMetadata = jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(
+        new JobModel(OLD_CONFIG, containerModelMap), OLD_CONFIG);
+
+    Map<String, String> additionalConfig = new HashMap<>();
+    additionalConfig.put("yarn.am.high-availability.enabled", "true");
+
+    additionalConfig.putAll(OLD_CONFIG);
+    Config modifiedConfig = new MapConfig(additionalConfig);
+    JobCoordinatorMetadata actualMetadata = jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(
+        new JobModel(modifiedConfig, containerModelMap), modifiedConfig);
+    assertEquals("Job coordinator metadata should remain the same", expectedMetadata, actualMetadata);
+  }
+
+  @Test
+  public void testReadJobCoordinatorMetadataFailed() {
+    JobCoordinatorMetadata jobCoordinatorMetadata =
+        new JobCoordinatorMetadata(NEW_EPOCH_ID, NEW_CONFIG_ID, NEW_JOB_MODEL_ID);
+    Serde<String> mockSerde = spy(new CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE));
+    doThrow(new RuntimeException("Failed to read coordinator stream"))
+        .when(mockSerde).fromBytes(any());
+
+    jobCoordinatorMetadataManager = spy(new JobCoordinatorMetadataManager(metadataStore,
+        ClusterType.YARN, new MetricsRegistryMap(), mockSerde));
+    jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(jobCoordinatorMetadata);
+
+    JobCoordinatorMetadata actualMetadata = jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    assertNull("Read failed should return null", actualMetadata);
+    assertEquals("Metadata read failed count should be 1", 1,
+        jobCoordinatorMetadataManager.getMetadataReadFailedCount().getCount());
+  }
+
+  @Test
+  public void testReadWriteJobCoordinatorMetadata() {
+    JobCoordinatorMetadata jobCoordinatorMetadata =
+        new JobCoordinatorMetadata(NEW_EPOCH_ID, NEW_CONFIG_ID, NEW_JOB_MODEL_ID);
+
+    jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(jobCoordinatorMetadata);
+
+    JobCoordinatorMetadata actualJobCoordinatorMetadata = jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    assertEquals("Mismatch in job coordinator metadata", jobCoordinatorMetadata, actualJobCoordinatorMetadata);
+  }
+
+  @Test (expected = NullPointerException.class)
+  public void testWriteNullJobCoordinatorMetadataShouldThrowException() {
+    jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(null);
+  }
+
+  @Test
+  public void testWriteJobCoordinatorMetadataBubblesException() {
+    doThrow(new RuntimeException("Failed to write to coordinator stream"))
+        .when(metadataStore).put(anyString(), any());
+    try {
+      jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(mock(JobCoordinatorMetadata.class));
+      fail("Expected write job coordinator metadata to throw exception");
+    } catch (Exception e) {
+      assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
+      assertEquals("Metadata write failed count should be 1", 1,
+          jobCoordinatorMetadataManager.getMetadataWriteFailedCount().getCount());
+    }
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 344f082..324c5e2 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -62,6 +62,8 @@
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -75,6 +77,7 @@
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -135,7 +138,7 @@
     runner.run(externalContext);
 
     verify(metadataStore).init();
-    verify(metadataStore, never()).close();
+    verify(metadataStore).close();
 
     assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
   }
@@ -171,7 +174,7 @@
     runner.run();
 
     verify(metadataStore).init();
-    verify(metadataStore, never()).close();
+    verify(metadataStore).close();
 
     assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
   }
@@ -209,7 +212,7 @@
     runner.waitForFinish();
 
     verify(coordinatorStreamStore).init();
-    verify(coordinatorStreamStore, never()).close();
+    verify(coordinatorStreamStore).close();
 
     assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
   }
@@ -311,10 +314,17 @@
       return null;
     }).when(sp).start();
 
-    doAnswer(i -> {
-      ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
-      listener.afterStop();
-      return null;
+    doAnswer(new Answer() {
+      private int count = 0;
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        if (++count == 1) {
+          ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+          listener.afterStop();
+          return null;
+        }
+        return null;
+      }
     }).when(sp).stop();
 
     ExternalContext externalContext = mock(ExternalContext.class);
@@ -326,7 +336,7 @@
     runner.kill();
 
     verify(coordinatorStreamStore).init();
-    verify(coordinatorStreamStore).close();
+    verify(coordinatorStreamStore, atLeastOnce()).close();
 
     assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
   }
@@ -353,10 +363,17 @@
       return null;
     }).when(sp).start();
 
-    doAnswer(i -> {
-      ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
-      listener.afterStop();
-      return null;
+    doAnswer(new Answer() {
+      private int count = 0;
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        if (++count == 1) {
+          ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+          listener.afterStop();
+          return null;
+        }
+        return null;
+      }
     }).when(sp).stop();
 
     ExternalContext externalContext = mock(ExternalContext.class);
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index ce4ec0b..757e7ae 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -68,7 +68,7 @@
   val expectedGrouperFactory: String = new JobConfig(config).getSystemStreamPartitionGrouperFactory
 
   val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
-  val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
+  val systemAdmin =  systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
 
   var taskNames: Set[TaskName] = Set[TaskName]()
   var taskNamesToCheckpoints: Map[TaskName, Checkpoint] = _
@@ -83,19 +83,23 @@
 
   /**
     * Create checkpoint stream prior to start.
+    *
     */
   override def createResources(): Unit = {
-    Preconditions.checkNotNull(systemAdmin)
+    val createResourcesSystemAdmin =  systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName + "createResource")
+    Preconditions.checkNotNull(createResourcesSystemAdmin)
+    createResourcesSystemAdmin.start()
+    try {
+      info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
+        s"partition count: ${checkpointSpec.getPartitionCount}")
+      createResourcesSystemAdmin.createStream(checkpointSpec)
 
-    systemAdmin.start()
-
-    info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
-      s"partition count: ${checkpointSpec.getPartitionCount}")
-    systemAdmin.createStream(checkpointSpec)
-
-    if (validateCheckpoint) {
-      info(s"Validating checkpoint stream")
-      systemAdmin.validateStream(checkpointSpec)
+      if (validateCheckpoint) {
+        info(s"Validating checkpoint stream")
+        createResourcesSystemAdmin.validateStream(checkpointSpec)
+      }
+    } finally {
+      createResourcesSystemAdmin.stop()
     }
   }
 
@@ -106,7 +110,8 @@
     // register and start a producer for the checkpoint topic
     info("Starting the checkpoint SystemProducer")
     producerRef.get().start()
-
+    info("Starting the checkpoint SystemAdmin")
+    systemAdmin.start()
     // register and start a consumer for the checkpoint topic
     val oldestOffset = getOldestOffset(checkpointSsp)
     info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset")
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 4a4ae7b..edcb159 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -288,7 +288,6 @@
     checkpointManagerOption match {
       case Some(checkpointManager) =>
         checkpointManager.createResources()
-        checkpointManager.stop()
       case _ => throw new ConfigException("No checkpoint manager factory configured")
     }
 
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 7a971a9..ccdd00f 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -19,7 +19,9 @@
 
 package org.apache.samza.job.yarn;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
+import java.util.Set;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +45,7 @@
 import org.apache.samza.clustermanager.ProcessorLaunchException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.YarnConfig;
 import org.apache.samza.coordinator.JobModelManager;
@@ -192,7 +195,8 @@
         clusterManagerConfig.getNumCores(),
         samzaAppState,
         state,
-        amClient
+        amClient,
+        new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()
     );
     this.nmClientAsync = NMClientAsync.createNMClientAsync(this);
 
@@ -215,7 +219,12 @@
     amClient.start();
     nmClientAsync.init(yarnConfiguration);
     nmClientAsync.start();
-    lifecycle.onInit();
+    Set<ContainerId> previousAttemptsContainers = lifecycle.onInit();
+
+    if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
+      log.info("Received running containers from previous attempt. Invoking launch success for them.");
+      previousAttemptsContainers.forEach(this::handleOnContainerStarted);
+    }
 
     if (lifecycle.shouldShutdown()) {
       clusterManagerCallback.onError(new SamzaException("Invalid resource request."));
@@ -232,6 +241,7 @@
     String processorId = resourceRequest.getProcessorId();
     String requestId = resourceRequest.getRequestId();
     String preferredHost = resourceRequest.getPreferredHost();
+    String[] racks = resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
     int memoryMb = resourceRequest.getMemoryMB();
     int cpuCores = resourceRequest.getNumCores();
     Resource capability = Resource.newInstance(memoryMb, cpuCores);
@@ -252,15 +262,15 @@
       Priority priority = Priority.newInstance(ANY_HOST_PRIORITY);
       boolean relaxLocality = true;
       log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
-          processorId, null, null, capability, priority, relaxLocality, nodeLabelsExpression);
+          processorId, null, Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
       issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, relaxLocality, nodeLabelsExpression);
     } else {
       String[] nodes = {preferredHost};
       Priority priority = Priority.newInstance(PREFERRED_HOST_PRIORITY);
       boolean relaxLocality = false;
       log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
-          processorId, Arrays.toString(nodes), null, capability, priority, relaxLocality, nodeLabelsExpression);
-      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, null, priority, relaxLocality, nodeLabelsExpression);
+          processorId, Arrays.toString(nodes), Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
+      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, racks, priority, relaxLocality, nodeLabelsExpression);
     }
     // ensure that updating the state and making the request are done atomically.
     synchronized (lock) {
@@ -322,9 +332,28 @@
 
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
-      log.info("Stopping Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
-      this.nmClientAsync.stopContainerAsync(allocatedResources.get(resource).getId(),
-          allocatedResources.get(resource).getNodeId());
+      Container container = allocatedResources.get(resource);
+      String containerId = resource.getContainerId();
+      String containerHost = resource.getHost();
+      /*
+       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource isn't present in the book keeping.
+       */
+      if (container != null) {
+        log.info("Stopping Container ID: {} on host: {}", containerId, containerHost);
+        this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+      } else {
+        YarnContainer yarnContainer = state.runningProcessors.get(getRunningProcessorId(containerId));
+        if (yarnContainer != null) {
+          log.info("Stopping container from previous attempt with Container ID: {} on host: {}",
+              containerId, containerHost);
+          amClient.releaseAssignedContainer(yarnContainer.id());
+        } else {
+          log.info("No container with Container ID: {} exists. Ignoring the stop request", containerId);
+        }
+      }
     }
   }
 
@@ -513,22 +542,7 @@
 
   @Override
   public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
-    String processorId = getPendingProcessorId(containerId);
-    if (processorId != null) {
-      log.info("Got start notification for Container ID: {} for Processor ID: {}", containerId, processorId);
-      // 1. Move the processor from pending to running state
-      final YarnContainer container = state.pendingProcessors.remove(processorId);
-
-      state.runningProcessors.put(processorId, container);
-
-      // 2. Invoke the success callback.
-      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
-          container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
-      clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
-    } else {
-      log.warn("Did not find the Processor ID for the start notification for Container ID: {}. " +
-          "Ignoring notification.", containerId);
-    }
+    handleOnContainerStarted(containerId);
   }
 
   @Override
@@ -726,4 +740,34 @@
     }
     return null;
   }
+
+  /**
+   * Handles container started call back for a yarn container.
+   * updates the YarnAppState's pendingProcessors and runningProcessors
+   * and also invokes clusterManagerCallback.s stream processor launch success
+   * @param containerId yarn container id which has started
+   */
+  private void handleOnContainerStarted(ContainerId containerId) {
+    String processorId = getPendingProcessorId(containerId);
+    if (processorId != null) {
+      log.info("Got start notification for Container ID: {} for Processor ID: {}", containerId, processorId);
+      // 1. Move the processor from pending to running state
+      final YarnContainer container = state.pendingProcessors.remove(processorId);
+
+      state.runningProcessors.put(processorId, container);
+
+      // 2. Invoke the success callback.
+      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
+          container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
+      clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
+    } else {
+      log.warn("Did not find the Processor ID for the start notification for Container ID: {}. " +
+          "Ignoring notification.", containerId);
+    }
+  }
+
+  @VisibleForTesting
+  ConcurrentHashMap<SamzaResource, Container> getAllocatedResources() {
+    return allocatedResources;
+  }
 }
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
new file mode 100644
index 0000000..2bafa78
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class functionality works with the assumption that the job.standbytasks.replication.factor is 2.
+ * For values greater than 2, it is possible that the standby containers could be on the same rack as the active, or the already existing standby racks.
+ */
+public class YarnFaultDomainManager implements FaultDomainManager {
+
+  private static final Logger log = LoggerFactory.getLogger(FaultDomainManager.class);
+  private static final String FAULT_DOMAIN_MANAGER_GROUP = "yarn-fault-domain-manager";
+  private static final String HOST_TO_FAULT_DOMAIN_CACHE_UPDATES = "host-to-fault-domain-cache-updates";
+  private Multimap<String, FaultDomain> hostToRackMap;
+  private final YarnClientImpl yarnClient;
+  private Counter hostToFaultDomainCacheUpdates;
+
+  public YarnFaultDomainManager(MetricsRegistry metricsRegistry) {
+    this.yarnClient = new YarnClientImpl();
+    yarnClient.init(new YarnConfiguration());
+    yarnClient.start();
+    this.hostToRackMap = computeHostToFaultDomainMap();
+    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
+  }
+
+  @VisibleForTesting
+  YarnFaultDomainManager(MetricsRegistry metricsRegistry, YarnClientImpl yarnClient, Multimap<String, FaultDomain> hostToRackMap) {
+    this.yarnClient = yarnClient;
+    yarnClient.init(new YarnConfiguration());
+    yarnClient.start();
+    this.hostToRackMap = hostToRackMap;
+    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
+  }
+
+  /**
+   * This method returns all the last cached rack values in a cluster, for all hosts that are healthy, up and running.
+   * @return a set of {@link FaultDomain}s
+   */
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return new HashSet<>(hostToRackMap.values());
+  }
+
+  /**
+   * This method returns the rack a particular host resides on based on the internal cache.
+   * In case the rack of a host does not exist in this cache, we update the cache by computing the host to rack map again using Yarn.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  @Override
+  public Set<FaultDomain> getFaultDomainsForHost(String host) {
+    if (!hostToRackMap.containsKey(host)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      hostToFaultDomainCacheUpdates.inc();
+    }
+    return new HashSet<>(hostToRackMap.get(host));
+  }
+
+  /**
+   * This method checks if the two hostnames provided reside on the same rack.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same rack
+   */
+  @Override
+  public boolean hasSameFaultDomains(String host1, String host2) {
+    if (!hostToRackMap.keySet().contains(host1) || !hostToRackMap.keySet().contains(host2)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      hostToFaultDomainCacheUpdates.inc();
+    }
+    return hostToRackMap.get(host1).equals(hostToRackMap.get(host2));
+  }
+
+  /**
+   * This method computes the host to rack map from Yarn.
+   * Only the hosts that are running in the cluster will be a part of this map.
+   * @return map of the host and the rack it resides on
+   */
+  @VisibleForTesting
+  Multimap<String, FaultDomain> computeHostToFaultDomainMap() {
+    Multimap<String, FaultDomain> hostToRackMap = HashMultimap.create();
+    try {
+      List<NodeReport> nodeReport = yarnClient.getNodeReports(NodeState.RUNNING);
+      nodeReport.forEach(report -> {
+        FaultDomain rack = new FaultDomain(FaultDomainType.RACK, report.getRackName());
+        hostToRackMap.put(report.getNodeId().getHost(), rack);
+      });
+      log.info("Computed the host to rack map successfully from Yarn.");
+    } catch (YarnException | IOException e) {
+      throw new SamzaException("Yarn threw an exception while getting NodeReports.", e);
+    }
+    return hostToRackMap;
+  }
+}
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManagerFactory.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManagerFactory.java
new file mode 100644
index 0000000..e4e547c
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManagerFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainManagerFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * A factory to build a {@link YarnFaultDomainManager}.
+ */
+public class YarnFaultDomainManagerFactory implements FaultDomainManagerFactory {
+  @Override
+  public FaultDomainManager getFaultDomainManager(Config config, MetricsRegistry metricsRegistry) {
+    return new YarnFaultDomainManager(metricsRegistry);
+  }
+}
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index 4c3c93e..8e0c3d1 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -175,6 +175,10 @@
     appCtx.setApplicationId(appId.get)
     info("set app ID to %s" format appId.get)
 
+    if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled) {
+      appCtx.setKeepContainersAcrossApplicationAttempts(true)
+      info("keep containers alive across application attempts for AM High availability")
+    }
     val localResources: HashMap[String, LocalResource] = HashMap[String, LocalResource]()
     localResources += "__package" -> packageResource
 
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
index 5c0dfac..27e0b1f 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
@@ -20,27 +20,32 @@
 package org.apache.samza.job.yarn
 
 import java.io.IOException
+import java.util
+import java.util.HashMap
 
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.api.records.{Container, ContainerId, FinalApplicationStatus}
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
 import org.apache.hadoop.yarn.exceptions.{InvalidApplicationMasterRequestException, YarnException}
 import org.apache.samza.SamzaException
-import org.apache.samza.clustermanager.SamzaApplicationState
+import org.apache.samza.clustermanager.{SamzaApplicationState, SamzaResource}
 import SamzaApplicationState.SamzaAppStatus
 import org.apache.samza.util.Logging
 
+import scala.collection.JavaConverters._
+
 /**
  * Responsible for managing the lifecycle of the Yarn application master. Mostly,
  * this means registering and unregistering with the RM, and shutting down
  * when the RM tells us to Reboot.
  */
 //This class is used in the refactored code path as called by run-jc.sh
-class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaAppState: SamzaApplicationState, state: YarnAppState, amClient: AMRMClientAsync[ContainerRequest]) extends Logging {
+class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaAppState: SamzaApplicationState, state: YarnAppState, amClient: AMRMClientAsync[ContainerRequest],
+  isApplicationMasterHighAvailabilityEnabled: Boolean) extends Logging {
   var validResourceRequest = true
   var shutdownMessage: String = null
   var webApp: SamzaYarnAppMasterService = null
-  def onInit() {
+  def onInit(): util.Set[ContainerId] = {
     val host = state.nodeHost
     val response = amClient.registerApplicationMaster(host, state.rpcUrl.getPort, "%s:%d" format (host, state.trackingUrl.getPort))
 
@@ -48,6 +53,19 @@
     val maxCapability = response.getMaximumResourceCapability
     val maxMem = maxCapability.getMemory
     val maxCpu = maxCapability.getVirtualCores
+    val previousAttemptContainers = new util.HashSet[ContainerId]()
+    if (isApplicationMasterHighAvailabilityEnabled) {
+      val yarnIdToprocIdMap = new HashMap[String, String]()
+      samzaAppState.processorToExecutionId.asScala foreach { entry => yarnIdToprocIdMap.put(entry._2, entry._1) }
+      response.getContainersFromPreviousAttempts.asScala foreach { (ctr: Container) =>
+        val samzaProcId = yarnIdToprocIdMap.get(ctr.getId.toString)
+        info("Received container from previous attempt with samza processor id %s and yarn container id %s" format(samzaProcId, ctr.getId.toString))
+        samzaAppState.pendingProcessors.put(samzaProcId,
+          new SamzaResource(ctr.getResource.getVirtualCores, ctr.getResource.getMemory, ctr.getNodeId.getHost, ctr.getId.toString))
+        state.pendingProcessors.put(samzaProcId, new YarnContainer(ctr))
+        previousAttemptContainers.add(ctr.getId)
+      }
+    }
     info("Got AM register response. The YARN RM supports container requests with max-mem: %s, max-cpu: %s" format (maxMem, maxCpu))
 
     if (containerMem > maxMem || containerCpu > maxCpu) {
@@ -57,6 +75,7 @@
       samzaAppState.status = SamzaAppStatus.FAILED;
       samzaAppState.jobHealthy.set(false)
     }
+    previousAttemptContainers
   }
 
   def onReboot() {
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
index d787f9e..2b62b96 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
@@ -36,6 +36,7 @@
     contentType = "text/html"
   }
 
+  // Due to AMHA, the uptime and start time of containers (within state) from previous attempt is reset to the time the new AM becomes alive.
   get("/") {
     layoutTemplate("/WEB-INF/views/index.scaml",
       "config" -> TreeMap(samzaConfig.sanitize.asScala.toMap.toArray: _*),
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
index 1ed1d09..89929f7 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -21,6 +21,10 @@
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -39,31 +43,49 @@
 import org.apache.samza.clustermanager.SamzaApplicationState;
 import org.apache.samza.clustermanager.SamzaResource;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyObject;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.*;
 
 
 public class TestYarnClusterResourceManager {
 
+  private YarnConfiguration yarnConfiguration;
+  private Config config;
+  private SamzaAppMasterMetrics metrics;
+  private AMRMClientAsync asyncClient;
+  private SamzaYarnAppMasterLifecycle lifecycle;
+  private SamzaYarnAppMasterService service;
+  private NMClientAsync asyncNMClient;
+  private ClusterResourceManager.Callback callback;
+  private YarnAppState yarnAppState;
+
+  @Before
+  public void setup() {
+    yarnConfiguration = mock(YarnConfiguration.class);
+    config = mock(Config.class);
+    metrics = mock(SamzaAppMasterMetrics.class);
+    asyncClient = mock(AMRMClientAsync.class);
+    lifecycle = mock(SamzaYarnAppMasterLifecycle.class);
+    service = mock(SamzaYarnAppMasterService.class);
+    asyncNMClient = mock(NMClientAsync.class);
+    callback = mock(ClusterResourceManager.Callback.class);
+    yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+  }
+
   @Test
   public void testErrorInStartContainerShouldUpdateState() {
     // create mocks
     final int samzaContainerId = 1;
-    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
-    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
-    Config config = mock(Config.class);
-    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
-    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
-    SamzaYarnAppMasterLifecycle lifecycle = mock(SamzaYarnAppMasterLifecycle.class);
-    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
-    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
-    ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
 
     // start the cluster manager
     YarnClusterResourceManager yarnClusterResourceManager =
@@ -87,16 +109,6 @@
 
   @Test
   public void testAllocatedResourceExpiryForYarn() {
-    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
-    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
-    Config config = mock(Config.class);
-    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
-    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
-    SamzaYarnAppMasterLifecycle lifecycle = mock(SamzaYarnAppMasterLifecycle.class);
-    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
-    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
-    ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
-
     // start the cluster manager
     YarnClusterResourceManager yarnClusterResourceManager =
         new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
@@ -111,15 +123,7 @@
   @Test
   public void testAMShutdownOnRMCallback() throws IOException, YarnException {
     // create mocks
-    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
-    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
-    Config config = mock(Config.class);
-    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
-    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
-    SamzaYarnAppMasterLifecycle lifecycle = Mockito.spy(new SamzaYarnAppMasterLifecycle(512, 2, mock(SamzaApplicationState.class), yarnAppState, asyncClient));
-    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
-    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
-    ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+    SamzaYarnAppMasterLifecycle lifecycle = Mockito.spy(new SamzaYarnAppMasterLifecycle(512, 2, mock(SamzaApplicationState.class), yarnAppState, asyncClient, false));
 
     // start the cluster manager
     YarnClusterResourceManager yarnClusterResourceManager =
@@ -139,15 +143,7 @@
   @Test
   public void testAMShutdownThrowingExceptionOnRMCallback() throws IOException, YarnException {
     // create mocks
-    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
-    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
-    Config config = mock(Config.class);
-    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
-    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
-    SamzaYarnAppMasterLifecycle lifecycle = Mockito.spy(new SamzaYarnAppMasterLifecycle(512, 2, mock(SamzaApplicationState.class), yarnAppState, asyncClient));
-    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
-    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
-    ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+    SamzaYarnAppMasterLifecycle lifecycle = Mockito.spy(new SamzaYarnAppMasterLifecycle(512, 2, mock(SamzaApplicationState.class), yarnAppState, asyncClient, false));
 
     doThrow(InvalidApplicationMasterRequestException.class).when(asyncClient).unregisterApplicationMaster(FinalApplicationStatus.FAILED, null, null);
 
@@ -165,4 +161,90 @@
     verify(service, times(1)).onShutdown();
     verify(metrics, times(1)).stop();
   }
+
+  @Test
+  public void testAMHACallbackInvokedForPreviousAttemptContainers() {
+    String previousAttemptContainerId = "0";
+    String previousAttemptYarnContainerId = "container_1607304997422_0008_02_000002";
+    // create mocks
+    YarnAppState yarnAppState = Mockito.spy(new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081));
+
+    ContainerId containerId = mock(ContainerId.class);
+    when(containerId.toString()).thenReturn(previousAttemptYarnContainerId);
+
+    YarnContainer yarnContainer = mock(YarnContainer.class);
+    Resource resource = mock(Resource.class);
+    when(resource.getMemory()).thenReturn(1024);
+    Mockito.when(resource.getVirtualCores()).thenReturn(1);
+    Mockito.when(yarnContainer.resource()).thenReturn(resource);
+    Mockito.when(yarnContainer.id()).thenReturn(containerId);
+    NodeId nodeId = mock(NodeId.class);
+    when(nodeId.getHost()).thenReturn("host");
+    when(yarnContainer.nodeId()).thenReturn(nodeId);
+
+    yarnAppState.pendingProcessors.put(previousAttemptContainerId, yarnContainer);
+
+    Set<ContainerId> previousAttemptContainers = new HashSet<>();
+    previousAttemptContainers.add(containerId);
+    when(lifecycle.onInit()).thenReturn(previousAttemptContainers);
+
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.YARN_AM_HIGH_AVAILABILITY_ENABLED, "true");
+    Config config = new MapConfig(configMap);
+
+    // start the cluster manager
+    YarnClusterResourceManager yarnClusterResourceManager =
+        new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
+            yarnConfiguration, config);
+
+    yarnClusterResourceManager.start();
+    verify(lifecycle).onInit();
+    ArgumentCaptor<SamzaResource> samzaResourceArgumentCaptor = ArgumentCaptor.forClass(SamzaResource.class);
+    verify(callback).onStreamProcessorLaunchSuccess(samzaResourceArgumentCaptor.capture());
+    SamzaResource samzaResource = samzaResourceArgumentCaptor.getValue();
+    assertEquals(previousAttemptYarnContainerId, samzaResource.getContainerId());
+  }
+
+  @Test
+  public void testStopStreamProcessorForContainerFromPreviousAttempt() {
+    String containerId = "Yarn_Container_id_0";
+    String processorId = "Container_id_0";
+    YarnContainer runningYarnContainer = mock(YarnContainer.class);
+    ContainerId previousRunningContainerId = mock(ContainerId.class);
+    YarnAppState yarnAppState = Mockito.spy(new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081));
+
+    yarnAppState.runningProcessors.put(processorId, runningYarnContainer);
+    when(runningYarnContainer.id()).thenReturn(previousRunningContainerId);
+    when(previousRunningContainerId.toString()).thenReturn(containerId);
+
+    YarnClusterResourceManager yarnClusterResourceManager =
+        new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
+            yarnConfiguration, config);
+
+    SamzaResource containerResourceFromPreviousRun = mock(SamzaResource.class);
+    when(containerResourceFromPreviousRun.getContainerId()).thenReturn(containerId);
+
+    yarnClusterResourceManager.stopStreamProcessor(containerResourceFromPreviousRun);
+    verify(asyncClient, times(1)).releaseAssignedContainer(previousRunningContainerId);
+  }
+
+  @Test
+  public void testStopStreamProcessorForContainerStartedInCurrentLifecycle() {
+    YarnClusterResourceManager yarnClusterResourceManager =
+        new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
+            yarnConfiguration, config);
+
+    SamzaResource allocatedContainerResource = mock(SamzaResource.class);
+    Container runningContainer = mock(Container.class);
+    ContainerId runningContainerId = mock(ContainerId.class);
+    NodeId runningNodeId = mock(NodeId.class);
+
+    when(runningContainer.getId()).thenReturn(runningContainerId);
+    when(runningContainer.getNodeId()).thenReturn(runningNodeId);
+
+    yarnClusterResourceManager.getAllocatedResources().put(allocatedContainerResource, runningContainer);
+    yarnClusterResourceManager.stopStreamProcessor(allocatedContainerResource);
+
+    verify(asyncNMClient, times(1)).stopContainerAsync(runningContainerId, runningNodeId);
+  }
 }
\ No newline at end of file
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnFaultDomainManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnFaultDomainManager.java
new file mode 100644
index 0000000..9216088
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnFaultDomainManager.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestYarnFaultDomainManager {
+  private final Multimap<String, FaultDomain> hostToRackMap = HashMultimap.create();
+  private final String hostName1 = "host1";
+  private final String hostName2 = "host2";
+  private final String hostName3 = "host3";
+  private final String hostName4 = "host4";
+  private final String hostName5 = "host5";
+  private final String hostName6 = "host6";
+  private final String rackName1 = "rack1";
+  private final String rackName2 = "rack2";
+  private final String rackName3 = "rack3";
+
+  private final NodeReport nodeReport1 = createNodeReport(hostName1, 1, NodeState.RUNNING, "httpAddress1",
+          rackName1, 1, 1, 2, 1, 2,
+          "", 60L, null);
+  private final NodeReport nodeReport2 = createNodeReport(hostName2, 1, NodeState.RUNNING, "httpAddress2",
+          rackName2, 1, 1, 2, 1, 2,
+          "", 60L, null);
+  private final NodeReport nodeReport3 = createNodeReport(hostName3, 1, NodeState.RUNNING, "httpAddress3",
+          rackName1, 1, 1, 2, 1, 2,
+          "", 60L, null);
+  private final NodeReport nodeReport4 = createNodeReport(hostName4, 1, NodeState.RUNNING, "httpAddress4",
+          rackName2, 1, 1, 2, 1, 2,
+          "", 60L, null);
+  private final NodeReport nodeReport5 = createNodeReport(hostName5, 1, NodeState.RUNNING, "httpAddress5",
+          rackName3, 1, 1, 2, 1, 2,
+          "", 60L, null);
+  private final NodeReport nodeReport6 = createNodeReport(hostName6, 1, NodeState.RUNNING, "httpAddress6",
+          rackName1, 1, 1, 2, 1, 2,
+          "", 60L, null);
+
+  @Mock
+  YarnClientImpl yarnClient;
+  @Mock
+  ReadableMetricsRegistry mockMetricsRegistry;
+  @Mock
+  Counter mockCounter;
+
+  @Before
+  public void setup() {
+    FaultDomain rack1 = new FaultDomain(FaultDomainType.RACK, rackName1);
+    FaultDomain rack2 = new FaultDomain(FaultDomainType.RACK, rackName2);
+    FaultDomain rack3 = new FaultDomain(FaultDomainType.RACK, rackName3);
+    hostToRackMap.put(hostName1, rack1);
+    hostToRackMap.put(hostName2, rack2);
+    hostToRackMap.put(hostName3, rack1);
+    hostToRackMap.put(hostName4, rack2);
+    hostToRackMap.put(hostName5, rack3);
+
+    when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockCounter);
+  }
+
+  @Test
+  public void testGetFaultDomainOfHostWhichExistsInCache() {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, hostToRackMap);
+
+    Set<FaultDomain> expectedFaultDomainSet = new HashSet<>();
+    expectedFaultDomainSet.add(new FaultDomain(FaultDomainType.RACK, rackName1));
+
+    Set<FaultDomain> actualFaultDomainSet = yarnFaultDomainManager.getFaultDomainsForHost(hostName3);
+
+    assertNotNull(actualFaultDomainSet);
+    assertEquals(expectedFaultDomainSet.iterator().next(), actualFaultDomainSet.iterator().next());
+    verify(mockCounter, times(0)).inc();
+  }
+
+  @Test
+  public void testGetFaultDomainOfHostWhichDoesNotExistInCache() throws IOException, YarnException {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, hostToRackMap);
+
+    Set<FaultDomain> expectedFaultDomainSet = new HashSet<>();
+    expectedFaultDomainSet.add(new FaultDomain(FaultDomainType.RACK, rackName1));
+
+    List<NodeReport> updatedNodeReport = ImmutableList.of(nodeReport1, nodeReport2, nodeReport3, nodeReport4, nodeReport5, nodeReport6);
+    when(yarnClient.getNodeReports(NodeState.RUNNING)).thenReturn(updatedNodeReport);
+
+    Set<FaultDomain> actualFaultDomainSet = yarnFaultDomainManager.getFaultDomainsForHost(hostName6);
+
+    assertNotNull(actualFaultDomainSet);
+    assertEquals(expectedFaultDomainSet.iterator().next(), actualFaultDomainSet.iterator().next());
+    verify(mockCounter, times(1)).inc();
+  }
+
+  @Test
+  public void testHasSameFaultDomainsWhenTrue() {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, hostToRackMap);
+
+    boolean result = yarnFaultDomainManager.hasSameFaultDomains(hostName1, hostName3);
+
+    assertTrue(result);
+  }
+
+  @Test
+  public void testHasSameFaultDomainsWhenFalse() {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, hostToRackMap);
+
+    boolean result = yarnFaultDomainManager.hasSameFaultDomains(hostName1, hostName2);
+
+    assertFalse(result);
+  }
+
+  @Test
+  public void testHasSameFaultDomainsWhenHostDoesNotExistInCache() throws IOException, YarnException {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, hostToRackMap);
+
+    List<NodeReport> updatedNodeReport = ImmutableList.of(nodeReport1, nodeReport2, nodeReport3, nodeReport4, nodeReport5, nodeReport6);
+    when(yarnClient.getNodeReports(NodeState.RUNNING)).thenReturn(updatedNodeReport);
+
+    boolean result = yarnFaultDomainManager.hasSameFaultDomains(hostName1, hostName6);
+
+    assertTrue(result);
+  }
+
+  @Test
+  public void testComputeHostToFaultDomainMap() throws IOException, YarnException {
+    YarnFaultDomainManager yarnFaultDomainManager = new YarnFaultDomainManager(mockMetricsRegistry, yarnClient, null);
+
+    List<NodeReport> nodeReport = ImmutableList.of(nodeReport1, nodeReport2, nodeReport3, nodeReport4, nodeReport5);
+    when(yarnClient.getNodeReports(NodeState.RUNNING)).thenReturn(nodeReport);
+
+    Multimap<String, FaultDomain> hostToRackMap = yarnFaultDomainManager.computeHostToFaultDomainMap();
+
+    assertEquals(this.hostToRackMap.size(), hostToRackMap.size());
+    assertEquals(this.hostToRackMap.keySet(), hostToRackMap.keySet());
+    Iterator<FaultDomain> expectedValues = this.hostToRackMap.values().iterator();
+    Iterator<FaultDomain> computedValues = hostToRackMap.values().iterator();
+    expectedValues.forEachRemaining(expectedRack -> assertFaultDomainEquals(expectedRack, computedValues.next()));
+  }
+
+  private void assertFaultDomainEquals(FaultDomain faultDomain1, FaultDomain faultDomain2) {
+    assertEquals(faultDomain1.getType(), faultDomain2.getType());
+    assertEquals(faultDomain1.getId(), faultDomain2.getId());
+  }
+
+  private NodeReport createNodeReport(String host, int port, NodeState nodeState, String httpAddress, String rackName,
+                                      int memoryUsed, int vcoresUsed, int totalMemory, int totalVcores, int numContainers,
+                                      String healthReport, long lastHealthReportTime, Set<String> nodeLabels) {
+    return NodeReport.newInstance(NodeId.newInstance(host, port), nodeState, httpAddress, rackName,
+            Resource.newInstance(memoryUsed, vcoresUsed), Resource.newInstance(totalMemory, totalVcores), numContainers,
+            healthReport, lastHealthReportTime, nodeLabels);
+  }
+}
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala
index 2664e41..5f78f78 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterLifecycle.scala
@@ -36,9 +36,13 @@
 import org.apache.samza.coordinator.JobModelManager
 import org.junit.Assert._
 import org.junit.Test
-import org.mockito.Mockito
+import org.mockito.{ArgumentCaptor, Mockito}
 
 class TestSamzaYarnAppMasterLifecycle {
+  private def YARN_CONTAINER_ID = "container_123_123_123"
+  private def YARN_CONTAINER_HOST = "host"
+  private def YARN_CONTAINER_MEM = 1024
+  private def YARN_CONTAINER_VCORE = 1
   val coordinator = new JobModelManager(null, null)
   val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) {
     var host = ""
@@ -60,7 +64,10 @@
         }
         override def getClientToAMTokenMasterKey = null
         override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {}
-        override def getContainersFromPreviousAttempts(): java.util.List[Container] = java.util.Collections.emptyList[Container]
+        // to test AM high availability - return a running container from previous attempt
+        val prevAttemptCotainers = new java.util.ArrayList[Container]()
+        prevAttemptCotainers.add(getMockContainer)
+        override def getContainersFromPreviousAttempts(): java.util.List[Container] = prevAttemptCotainers
         override def getNMTokensFromPreviousAttempts(): java.util.List[NMToken] = java.util.Collections.emptyList[NMToken]
         override def getQueue(): String = null
         override def setContainersFromPreviousAttempts(containers: java.util.List[Container]): Unit = Unit
@@ -92,7 +99,7 @@
     yarnState.rpcUrl = new URL("http://localhost:1")
     yarnState.trackingUrl = new URL("http://localhost:2")
 
-    val saml = new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient)
+    val saml = new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient, false)
     saml.onInit
     assertEquals("testHost", amClient.host)
     assertEquals(1, amClient.port)
@@ -104,7 +111,7 @@
     val state = new SamzaApplicationState(coordinator)
 
     val yarnState =  new YarnAppState(1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 2);
-    new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient).onShutdown (SamzaAppStatus.SUCCEEDED)
+    new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient, false).onShutdown (SamzaAppStatus.SUCCEEDED)
     assertEquals(FinalApplicationStatus.SUCCEEDED, amClient.status)
   }
 
@@ -115,7 +122,7 @@
       val state = new SamzaApplicationState(coordinator)
 
       val yarnState =  new YarnAppState(1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 2);
-      new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient).onReboot()
+      new SamzaYarnAppMasterLifecycle(512, 2, state, yarnState, amClient, false).onReboot()
     } catch {
       // expected
       case e: SamzaException => gotException = true
@@ -132,11 +139,62 @@
     yarnState.trackingUrl = new URL("http://localhost:2")
 
     //Request a higher amount of memory from yarn.
-    List(new SamzaYarnAppMasterLifecycle(768, 1, state, yarnState, amClient),
+    List(new SamzaYarnAppMasterLifecycle(768, 1, state, yarnState, amClient, false),
     //Request a higher number of cores from yarn.
-      new SamzaYarnAppMasterLifecycle(368, 3, state, yarnState, amClient)).map(saml => {
+      new SamzaYarnAppMasterLifecycle(368, 3, state, yarnState, amClient, false)).map(saml => {
         saml.onInit
         assertTrue(saml.shouldShutdown)
       })
   }
+
+  @Test
+  def testAMHighAvailabilityOnInit {
+    val PROCESSOR_ID = "0"
+    val samzaApplicationState = new SamzaApplicationState(coordinator)
+
+    samzaApplicationState.processorToExecutionId.put(PROCESSOR_ID, YARN_CONTAINER_ID);
+
+    val yarnState = new YarnAppState(1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 2);
+    yarnState.rpcUrl = new URL("http://localhost:1")
+    yarnState.trackingUrl = new URL("http://localhost:2")
+
+    val saml = new SamzaYarnAppMasterLifecycle(512, 2, samzaApplicationState, yarnState, amClient, true)
+    saml.onInit
+
+    // verify that the samzaApplicationState is updated to reflect a running container from previous attempt
+    assertEquals(1, samzaApplicationState.pendingProcessors.size())
+    assertTrue(samzaApplicationState.pendingProcessors.containsKey(PROCESSOR_ID))
+    val resource = samzaApplicationState.pendingProcessors.get(PROCESSOR_ID)
+    assertEquals(YARN_CONTAINER_ID, resource.getContainerId)
+    assertEquals(YARN_CONTAINER_HOST, resource.getHost)
+    assertEquals(YARN_CONTAINER_MEM, resource.getMemoryMb)
+    assertEquals(YARN_CONTAINER_VCORE, resource.getNumCores)
+
+    assertEquals(1, yarnState.pendingProcessors.size())
+    assertTrue(yarnState.pendingProcessors.containsKey(PROCESSOR_ID))
+    val yarnCtr = yarnState.pendingProcessors.get(PROCESSOR_ID)
+    assertEquals(YARN_CONTAINER_ID, yarnCtr.id.toString)
+    assertEquals(YARN_CONTAINER_HOST, yarnCtr.nodeId.getHost)
+    assertEquals(YARN_CONTAINER_MEM, yarnCtr.resource.getMemory)
+    assertEquals(YARN_CONTAINER_VCORE, yarnCtr.resource.getVirtualCores)
+  }
+
+  def getMockContainer: Container = {
+    val container = Mockito.mock(classOf[Container])
+
+    val containerId = Mockito.mock(classOf[ContainerId])
+    Mockito.when(containerId.toString).thenReturn(YARN_CONTAINER_ID)
+    Mockito.when(container.getId).thenReturn(containerId)
+
+    val resource = Mockito.mock(classOf[Resource])
+    Mockito.when(resource.getMemory).thenReturn(YARN_CONTAINER_MEM)
+    Mockito.when(resource.getVirtualCores).thenReturn(YARN_CONTAINER_VCORE)
+    Mockito.when(container.getResource).thenReturn(resource)
+
+    val nodeId = Mockito.mock(classOf[NodeId])
+    Mockito.when(nodeId.getHost).thenReturn(YARN_CONTAINER_HOST)
+    Mockito.when(container.getNodeId).thenReturn(nodeId)
+
+    container
+  }
 }