diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index fa5f783..88be21f 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -22,6 +22,7 @@
 import java.time.Instant;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
@@ -348,6 +349,16 @@
   }
 
   /**
+   * Requests a resource from the cluster manager
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param faultDomains set of fault domains on which to schedule this resource
+   */
+  public final void requestResource(String processorId, String preferredHost, Set<FaultDomain> faultDomains) {
+    requestResourceWithDelay(processorId, preferredHost, Duration.ZERO, faultDomains);
+  }
+
+  /**
    * Requests a resource from the cluster manager with a request timestamp of the current time plus the specified delay.
    * @param processorId Samza processor ID that will be run when a resource is allocated for this request
    * @param preferredHost name of the host that you prefer to run the processor on
@@ -359,6 +370,18 @@
   }
 
   /**
+   * Requests a resource from the cluster manager with a request timestamp of the current time plus the specified delay.
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param delay the {@link Duration} to add to the request timestamp
+   * @param faultDomains set of fault domains on which to schedule this resource
+   */
+  public final void requestResourceWithDelay(String processorId, String preferredHost, Duration delay, Set<FaultDomain> faultDomains) {
+    SamzaResourceRequest request = getResourceRequestWithDelay(processorId, preferredHost, delay, faultDomains);
+    issueResourceRequest(request);
+  }
+
+  /**
    * Creates a {@link SamzaResourceRequest} to send to the cluster manager
    * @param processorId Samza processor ID that will be run when a resource is allocated for this request
    * @param preferredHost name of the host that you prefer to run the processor on
@@ -369,6 +392,17 @@
   }
 
   /**
+   * Creates a {@link SamzaResourceRequest} to send to the cluster manager
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param faultDomains set of fault domains on which to schedule this resource
+   * @return the created request
+   */
+  public final SamzaResourceRequest getResourceRequest(String processorId, String preferredHost, Set<FaultDomain> faultDomains) {
+    return getResourceRequestWithDelay(processorId, preferredHost, Duration.ZERO, faultDomains);
+  }
+
+  /**
    * Creates a {@link SamzaResourceRequest} to send to the cluster manager with a request timestamp of the current time
    * plus the specified delay.
    * @param processorId Samza processor ID that will be run when a resource is allocated for this request
@@ -380,6 +414,19 @@
     return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay));
   }
 
+  /**
+   * Creates a {@link SamzaResourceRequest} to send to the cluster manager with a request timestamp of the current time
+   * plus the specified delay.
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param delay the {@link Duration} to add to the request timestamp
+   * @param faultDomains set of fault domains on which to schedule this resource
+   * @return the created request
+   */
+  public final SamzaResourceRequest getResourceRequestWithDelay(String processorId, String preferredHost, Duration delay, Set<FaultDomain> faultDomains) {
+    return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay), faultDomains);
+  }
+
   public final void issueResourceRequest(SamzaResourceRequest request) {
     resourceRequestState.addResourceRequest(request);
     state.containerRequests.incrementAndGet();
@@ -388,6 +435,9 @@
     } else {
       state.preferredHostRequests.incrementAndGet();
     }
+    if (!request.getFaultDomains().isEmpty()) {
+      state.faultDomainAwareContainerRequests.incrementAndGet();
+    }
   }
 
   /**
@@ -480,5 +530,8 @@
     } else {
       state.expiredPreferredHostRequests.incrementAndGet();
     }
+    if (!request.getFaultDomains().isEmpty()) {
+      state.expiredFaultDomainAwareContainerRequests.incrementAndGet();
+    }
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 24130fd..5fcf328 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -28,6 +28,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
+import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.placement.ContainerPlacementMessage;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
@@ -88,8 +89,9 @@
 
   public ContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
       SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
+      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager, FaultDomainManager faultDomainManager, Config config) {
     Preconditions.checkNotNull(localityManager, "Locality manager cannot be null");
+    Preconditions.checkNotNull(faultDomainManager, "Fault domain manager cannot be null");
     this.samzaApplicationState = samzaApplicationState;
     this.clusterResourceManager = clusterResourceManager;
     this.actions = new ConcurrentHashMap<>();
@@ -100,7 +102,7 @@
     // Enable standby container manager if required
     if (standByEnabled) {
       this.standbyContainerManager =
-          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager, localityManager));
+          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager, localityManager, config, faultDomainManager));
     } else {
       this.standbyContainerManager = Optional.empty();
     }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 995cf7d..143e0b3 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -149,6 +149,9 @@
     ResourceManagerFactory factory = getContainerProcessManagerFactory(clusterManagerConfig);
     this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state));
 
+    FaultDomainManagerFactory faultDomainManagerFactory = getFaultDomainManagerFactory(clusterManagerConfig);
+    FaultDomainManager faultDomainManager = checkNotNull(faultDomainManagerFactory.getFaultDomainManager(config, registry));
+
     // Initialize metrics
     this.containerProcessManagerMetrics = new ContainerProcessManagerMetrics(config, state, registry);
     this.jvmMetrics = new JvmMetrics(registry);
@@ -172,8 +175,8 @@
     // Wire all metrics to all reporters
     this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry));
 
-    this.containerManager = new ContainerManager(metadataStore, state, clusterResourceManager, hostAffinityEnabled,
-        jobConfig.getStandbyTasksEnabled(), localityManager);
+    this.containerManager = new ContainerManager(metadataStore, state, clusterResourceManager,
+            hostAffinityEnabled, jobConfig.getStandbyTasksEnabled(), localityManager, faultDomainManager, config);
 
     this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager);
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
@@ -649,6 +652,26 @@
   }
 
   /**
+   * Returns an instantiated {@link FaultDomainManagerFactory} from a {@link ClusterManagerConfig}. The
+   * {@link FaultDomainManagerFactory} is used to return an implementation of a {@link FaultDomainManager}
+   *
+   * @param clusterManagerConfig, the cluster manager config to parse.
+   *
+   */
+  private FaultDomainManagerFactory getFaultDomainManagerFactory(final ClusterManagerConfig clusterManagerConfig) {
+    final String faultDomainManagerFactoryClass = clusterManagerConfig.getFaultDomainManagerClass();
+    final FaultDomainManagerFactory factory;
+
+    try {
+      factory = ReflectionUtil.getObj(faultDomainManagerFactoryClass, FaultDomainManagerFactory.class);
+    } catch (Exception e) {
+      LOG.error("Error creating the fault domain manager.", e);
+      throw new SamzaException(e);
+    }
+    return factory;
+  }
+
+  /**
    * Obtains the ID of the Samza processor pending launch on the provided resource (container).
    *
    * ContainerProcessManager [INFO] Container ID: container_e66_1569376389369_0221_01_000049 matched pending Processor ID: 0 on host: ltx1-app0772.stg.linkedin.com
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index b849ea5..a07a924 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -28,6 +29,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
@@ -56,8 +59,13 @@
   // Resource-manager, used to stop containers
   private ClusterResourceManager clusterResourceManager;
 
-  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
-      ClusterResourceManager clusterResourceManager, LocalityManager localityManager) {
+  // FaultDomainManager, used to get fault domain information of different hosts from the cluster manager.
+  private final FaultDomainManager faultDomainManager;
+
+  private final boolean isFaultDomainAwareStandbyEnabled;
+
+  public StandbyContainerManager(SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
+                                 LocalityManager localityManager, Config config, FaultDomainManager faultDomainManager) {
     this.failovers = new ConcurrentHashMap<>();
     this.localityManager = localityManager;
     this.standbyContainerConstraints = new HashMap<>();
@@ -70,6 +78,9 @@
         .forEach(containerId -> standbyContainerConstraints.put(containerId,
             StandbyTaskUtil.getStandbyContainerConstraints(containerId, jobModel)));
     this.clusterResourceManager = clusterResourceManager;
+    this.faultDomainManager = faultDomainManager;
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+    this.isFaultDomainAwareStandbyEnabled = clusterManagerConfig.getFaultDomainAwareStandbyEnabled();
 
     log.info("Populated standbyContainerConstraints map {}", standbyContainerConstraints);
   }
@@ -125,7 +136,9 @@
 
     if (StandbyTaskUtil.isStandbyContainer(containerID)) {
       log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", containerID);
-      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
     } else {
       initiateStandbyAwareAllocation(containerID, resourceID, containerAllocator);
     }
@@ -157,6 +170,22 @@
   }
 
   /**
+   * This method removes the fault domain of the host passed as an argument, from the set of fault domains, and then returns it.
+   * The set of fault domains returned is based on the set difference between all the available fault domains in the
+   * cluster and the fault domain associated with the host that is passed as input.
+   * @param hostToAvoid hostname whose fault domains are excluded
+   * @return The set of fault domains which excludes the fault domain that the given host is on
+   */
+  public Set<FaultDomain> getAllowedFaultDomainsGivenHostToAvoid(String hostToAvoid) {
+    Set<FaultDomain> allFaultDomains = faultDomainManager.getAllFaultDomains();
+    Set<FaultDomain> faultDomainToAvoid = Optional.ofNullable(hostToAvoid)
+            .map(faultDomainManager::getFaultDomainsForHost)
+            .orElse(Collections.emptySet());
+    allFaultDomains.removeAll(faultDomainToAvoid);
+    return allFaultDomains;
+  }
+
+  /**
    *  If a standby container has stopped, then there are two possible cases
    *    Case 1. during a failover, the standby container was stopped for an active's start, then we
    *       1. request a resource on the standby's host to place the activeContainer, and
@@ -181,24 +210,29 @@
 
       // request standbycontainer's host for active-container
       SamzaResourceRequest resourceRequestForActive =
-          containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
+        containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
       // record the resource request, before issuing it to avoid race with allocation-thread
       failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
       containerAllocator.issueResourceRequest(resourceRequestForActive);
 
       // request any-host for standby container
-      containerAllocator.requestResource(standbyContainerID, ResourceRequestState.ANY_HOST);
+      requestResource(containerAllocator, standbyContainerID, ResourceRequestState.ANY_HOST, Duration.ZERO, standbyContainerHostname);
     } else {
       log.info("Issuing request for standby container {} on host {}, since this is not for a failover",
           standbyContainerID, preferredHost);
-      containerAllocator.requestResourceWithDelay(standbyContainerID, preferredHost, preferredHostRetryDelay);
+      String activeContainerHost = getActiveContainerHost(standbyContainerID)
+              .orElse(null);
+      requestResource(containerAllocator, standbyContainerID, preferredHost, preferredHostRetryDelay, activeContainerHost);
     }
   }
 
   /** Method to handle standby-aware allocation for an active container.
    *  We try to find a standby host for the active container, and issue a stop on any standby-containers running on it,
    *  request resource to place the active on the standby's host, and one to place the standby elsewhere.
-   *
+   *  When requesting for resources,
+   *  NOTE: When rack awareness is turned on, we always pass the <code>hostToAvoid</> parameter as null for the {@link #requestResource} method used here
+   *  because the hostname of the previous active processor that died does not exist in the running or pending container list anymore.
+   *  However, different racks will always be guaranteed through {@link #checkStandbyConstraintsAndRunStreamProcessor}.
    * @param activeContainerID the samzaContainerID of the active-container
    * @param resourceID  the samza-resource-ID of the container when it failed (used to index failover-state)
    */
@@ -235,7 +269,7 @@
 
         // record the resource request, before issuing it to avoid race with allocation-thread
         SamzaResourceRequest resourceRequestForActive =
-            containerAllocator.getResourceRequest(activeContainerID, standbyHost);
+                containerAllocator.getResourceRequest(activeContainerID, standbyHost);
         failoverMetadata.recordResourceRequest(resourceRequestForActive);
         containerAllocator.issueResourceRequest(resourceRequestForActive);
         samzaApplicationState.failoversToStandby.incrementAndGet();
@@ -281,7 +315,7 @@
     Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(activeContainerResourceID);
 
     // Iterate over the list of running standby containers, to find a standby resource that we have not already
-    // used for a failover for this active resoruce
+    // used for a failover for this active resource
     for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) {
 
       if (samzaApplicationState.runningProcessors.containsKey(standbyContainerID)) {
@@ -307,7 +341,7 @@
               .map(ProcessorLocality::host)
               .orElse(null);
 
-      if (StringUtils.isNotBlank(standbyHost)) {
+      if (StringUtils.isBlank(standbyHost)) {
         log.info("No last known standbyHost for container {}", standbyContainerID);
       } else if (failoverMetadata.isPresent() && failoverMetadata.get().isStandbyHostUsed(standbyHost)) {
 
@@ -361,8 +395,44 @@
   }
 
   /**
-   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+   * This method checks from the config if standby allocation is fault domain aware or not, and requests resources accordingly.
    *
+   * @param containerAllocator ContainerAllocator object that requests for resources from the resource manager
+   * @param containerID Samza container ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param preferredHostRetryDelay the {@link Duration} to add to the request timestamp
+   * @param hostToAvoid The hostname to avoid requesting this resource on if fault domain aware standby allocation is enabled
+   */
+  void requestResource(ContainerAllocator containerAllocator, String containerID, String preferredHost, Duration preferredHostRetryDelay, String hostToAvoid) {
+    if (StandbyTaskUtil.isStandbyContainer(containerID) && isFaultDomainAwareStandbyEnabled) {
+      containerAllocator.requestResourceWithDelay(containerID, preferredHost, preferredHostRetryDelay, getAllowedFaultDomainsGivenHostToAvoid(hostToAvoid));
+    } else {
+      containerAllocator.requestResourceWithDelay(containerID, preferredHost, preferredHostRetryDelay, new HashSet<>());
+    }
+  }
+
+  /**
+   * This method returns the active container host given a standby or active container ID.
+   *
+   * @param containerID Standby or active container container ID
+   * @return The active container host
+   */
+  Optional<String> getActiveContainerHost(String containerID) {
+    String activeContainerId = containerID;
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      activeContainerId = StandbyTaskUtil.getActiveContainerId(containerID);
+    }
+    SamzaResource resource = samzaApplicationState.pendingProcessors.get(activeContainerId);
+    if (resource == null) {
+      resource = samzaApplicationState.runningProcessors.get(activeContainerId);
+    }
+    return Optional.ofNullable(resource)
+            .map(SamzaResource::getHost);
+  }
+
+  /**
+   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+   * This includes the check that a standby and its active should not be on the same fault domain or the same host.
    * @param containerIdToStart logical id of the container to start
    * @param host potential host to start the container on
    * @return
@@ -375,17 +445,33 @@
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
+      if (resource != null && isFaultDomainAwareStandbyEnabled
+              && faultDomainManager.hasSameFaultDomains(host, resource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this fault domain",
+                containerIdToStart, host, containerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (resource != null && resource.getHost().equals(host)) {
         log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+                containerIdToStart, host, containerID);
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
+      if (resource != null && isFaultDomainAwareStandbyEnabled
+              && faultDomainManager.hasSameFaultDomains(host, resource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already running on this fault domain",
+                containerIdToStart, host, containerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (resource != null && resource.getHost().equals(host)) {
         log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+                containerIdToStart, host, containerID);
         return false;
       }
     }
@@ -409,16 +495,21 @@
       log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID,
           samzaResource.getHost(), preferredHost);
       containerAllocator.runStreamProcessor(request, preferredHost);
+      if (isFaultDomainAwareStandbyEnabled && StandbyTaskUtil.isStandbyContainer(containerID)) {
+        samzaApplicationState.faultDomainAwareContainersStarted.incrementAndGet();
+      }
     } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
       // This resource cannot be used to launch this standby container, so we make a new anyhost request
       log.info(
           "Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request",
           containerID, samzaResource.getHost());
       releaseUnstartableContainer(request, samzaResource, preferredHost, resourceRequestState);
-      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
       samzaApplicationState.failedStandbyAllocations.incrementAndGet();
     } else {
-      // This resource cannot be used to launch this active container container, so we initiate a failover
+      // This resource cannot be used to launch this active container, so we initiate a failover
       log.warn(
           "Running active container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource",
           containerID, samzaResource.getHost());
@@ -469,7 +560,6 @@
     resourceRequestState.cancelResourceRequest(request);
   }
 
-
   // Handle an expired resource request that was made for placing a standby container
   private void handleExpiredRequestForStandbyContainer(String containerID, SamzaResourceRequest request,
       Optional<SamzaResource> alternativeResource, ContainerAllocator containerAllocator,
@@ -486,7 +576,9 @@
       // If there is no alternative-resource for the standby container we make a new anyhost request
       log.info("Handling expired request, requesting anyHost resource for standby container {}", containerID);
       resourceRequestState.cancelResourceRequest(request);
-      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
     }
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 50a1ee1..e0b0739 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -88,6 +88,7 @@
     configMap.put("task.inputs", "kafka.topic1");
     configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.MockSystemFactory");
     configMap.put("samza.cluster-manager.factory", "org.apache.samza.clustermanager.MockClusterResourceManagerFactory");
+    configMap.put("cluster-manager.fault-domain-manager.factory", "org.apache.samza.clustermanager.MockFaultDomainManagerFactory");
     configMap.put("job.coordinator.monitor-partition-change.frequency.ms", "1");
 
     MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(0)), new ArrayList<>());
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 2b4a4b0..2c9ba81 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -64,6 +64,7 @@
   private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
 
   private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+  private final FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
   private ContainerPlacementMetadataStore containerPlacementMetadataStore;
   private ContainerManager containerManager;
 
@@ -89,7 +90,7 @@
     coordinatorStreamStore.init();
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
-    containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
+    containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager, faultDomainManager, config);
     containerAllocator =
         new ContainerAllocator(clusterResourceManager, config, state, true, containerManager);
     requestState = new MockContainerRequestState(clusterResourceManager, true);
@@ -369,7 +370,7 @@
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
     ClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class));
+        new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class), faultDomainManager, config);
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
       SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
@@ -416,7 +417,7 @@
   public void testExpiredRequestAllocationOnAnyHost() throws Exception {
     MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false, mock(LocalityManager.class), faultDomainManager, config));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyManager, config, state, true, spyContainerManager));
     // Request Preferred Resources
@@ -460,7 +461,7 @@
     // Add Extra Resources
     MockClusterResourceManager spyClusterResourceManager = spy(new MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false, mock(LocalityManager.class), faultDomainManager, config));
 
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyClusterResourceManager, config, state, true, spyContainerManager));
@@ -513,7 +514,7 @@
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
     MockClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class), faultDomainManager, config));
 
     SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, "host-0", "id0",
         System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index ac5d6f3..1f063d7 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -62,6 +62,7 @@
 
   private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
   private final MockClusterResourceManager manager = new MockClusterResourceManager(callback, state);
+  private final FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
 
   private CoordinatorStreamStore coordinatorStreamStore;
   private ContainerPlacementMetadataStore containerPlacementMetadataStore;
@@ -83,7 +84,7 @@
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
     containerAllocator = new ContainerAllocator(manager, config, state, false,
-        new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager));
+        new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager, faultDomainManager, config));
     requestState = new MockContainerRequestState(manager, false);
     Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -179,9 +180,11 @@
     });
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    ContainerManager containerManager = new ContainerManager(containerPlacementMetadataStore, state, manager, false,
+        false, mockLocalityManager, faultDomainManager, config);
     containerAllocator =
         MockContainerAllocatorWithoutHostAffinity.createContainerAllocatorWithConfigOverride(manager, config, state,
-            new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager),
+            containerManager,
             override);
     MockContainerAllocatorWithoutHostAffinity mockAllocator =
         (MockContainerAllocatorWithoutHostAffinity) containerAllocator;
@@ -331,7 +334,7 @@
     ClusterResourceManager.Callback mockCPM = mock(ClusterResourceManager.Callback.class);
     ClusterResourceManager mockManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, mockManager, false, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, mockManager, false, false, mock(LocalityManager.class), faultDomainManager, config));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(mockManager, config, state, false, spyContainerManager));
     // Mock the callback from ClusterManager to add resources to the allocator
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index c781f4d..e5ead9e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -141,13 +141,14 @@
     state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     localityManager = mock(LocalityManager.class);
     when(localityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of(
             "0", new ProcessorLocality("0", "host-1"),
             "1", new ProcessorLocality("1", "host-2"))));
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager, faultDomainManager, config));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
             clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager, false);
@@ -171,9 +172,10 @@
     state = new SamzaApplicationState(getJobModelManagerWithStandby());
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     // Enable standby
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, true, mockLocalityManager));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, true, mockLocalityManager, faultDomainManager, config));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
         clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, mockLocalityManager, false);
@@ -552,8 +554,9 @@
     state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager, faultDomainManager, config));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
         clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager, false);
@@ -666,8 +669,9 @@
         new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, false, false, localityManager);
+        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, false, false, localityManager, faultDomainManager, config);
     MockContainerAllocatorWithoutHostAffinity allocatorWithoutHostAffinity =
         new MockContainerAllocatorWithoutHostAffinity(clusterResourceManager, new MapConfig(conf), state,
             containerManager);
@@ -801,8 +805,9 @@
         new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ContainerManager containerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager, faultDomainManager, config));
     MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
         new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     ContainerProcessManager cpm = new ContainerProcessManager(
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index bcbe53f..ad45c5e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -141,11 +141,12 @@
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"))));
     ContainerManager containerManager =
-        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager, faultDomainManager);
     ContainerProcessManager cpm =
         buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());
 
@@ -592,6 +593,7 @@
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
 
     if (withHostAffinity) {
@@ -604,7 +606,7 @@
 
     ContainerManager containerManager =
         buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-            clusterManagerConfig.getHostAffinityEnabled(), false, mockLocalityManager);
+            clusterManagerConfig.getHostAffinityEnabled(), false, mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -614,7 +616,7 @@
 
     ContainerProcessManager cpm =
         buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator),
-            mockLocalityManager, false);
+            mockLocalityManager, false, faultDomainManager);
 
     // start triggers a request
     cpm.start();
@@ -755,11 +757,12 @@
     configMap.putAll(getConfig());
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("1", "host1"))));
     ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager);
+            Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -792,11 +795,12 @@
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(2));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"), "1", new ProcessorLocality("1", "host2"))));
     ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager);
+            Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithHostAffinity allocator = new MockContainerAllocatorWithHostAffinity(
         clusterResourceManager,
@@ -806,7 +810,7 @@
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager,
-            Optional.of(allocator), mockLocalityManager, false));
+            Optional.of(allocator), mockLocalityManager, false, faultDomainManager));
 
     cpm.start();
     assertFalse(cpm.shouldShutdown());
@@ -1031,16 +1035,16 @@
       SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
       boolean hostAffinityEnabled, boolean standByEnabled) {
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
     return buildContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager,
-        hostAffinityEnabled, standByEnabled, mockLocalityManager);
+        hostAffinityEnabled, standByEnabled, mockLocalityManager, faultDomainManager);
   }
 
   private ContainerManager buildContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
-      SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
-    return new ContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager,
-        hostAffinityEnabled, standByEnabled, localityManager);
+      SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager, boolean hostAffinityEnabled,
+      boolean standByEnabled, LocalityManager localityManager, FaultDomainManager faultDomainManager) {
+    return new ContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager, hostAffinityEnabled, standByEnabled, localityManager, faultDomainManager, config);
   }
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator) {
@@ -1050,16 +1054,17 @@
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator, boolean restartContainer) {
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
     return buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, allocator,
-        mockLocalityManager, restartContainer);
+        mockLocalityManager, restartContainer, faultDomainManager);
   }
 
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator, LocalityManager localityManager,
-      boolean restartContainers) {
+      boolean restartContainers, FaultDomainManager faultDomainManager) {
     return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager,
         allocator, buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        clusterManagerConfig.getHostAffinityEnabled(), false, localityManager), localityManager, restartContainers);
+        clusterManagerConfig.getHostAffinityEnabled(), false, localityManager, faultDomainManager), localityManager, restartContainers);
   }
 }
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index fa784e0..ccdd00f 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -241,6 +241,7 @@
     String processorId = resourceRequest.getProcessorId();
     String requestId = resourceRequest.getRequestId();
     String preferredHost = resourceRequest.getPreferredHost();
+    String[] racks = resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
     int memoryMb = resourceRequest.getMemoryMB();
     int cpuCores = resourceRequest.getNumCores();
     Resource capability = Resource.newInstance(memoryMb, cpuCores);
@@ -261,15 +262,15 @@
       Priority priority = Priority.newInstance(ANY_HOST_PRIORITY);
       boolean relaxLocality = true;
       log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
-          processorId, null, null, capability, priority, relaxLocality, nodeLabelsExpression);
+          processorId, null, Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
       issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, relaxLocality, nodeLabelsExpression);
     } else {
       String[] nodes = {preferredHost};
       Priority priority = Priority.newInstance(PREFERRED_HOST_PRIORITY);
       boolean relaxLocality = false;
       log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
-          processorId, Arrays.toString(nodes), null, capability, priority, relaxLocality, nodeLabelsExpression);
-      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, null, priority, relaxLocality, nodeLabelsExpression);
+          processorId, Arrays.toString(nodes), Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
+      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, racks, priority, relaxLocality, nodeLabelsExpression);
     }
     // ensure that updating the state and making the request are done atomically.
     synchronized (lock) {
