SAMZA-2605: Make Standby Container Requests Rack Aware  (#1446)

Feature: The aim of this feature is to make all standby container requests rack aware such that all active containers and their corresponding standby containers are always on different racks. This helps with decreased downtime of applications during rack failures.

One of the requirements of this feature is that the value of job.standbytasks.replication.factor is at max 2 for the rack awareness functionality to be honored.

Changes: This PR uses the FaultDomainManager interface for Yarn to request for rack aware nodes while making standby container requests.

Usage Instructions: For a job with host affinity and standby containers, set the config cluster-manager.fault-domain-aware.standby.enabled to true to enable this feature.
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index fa5f783..88be21f 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -22,6 +22,7 @@
 import java.time.Instant;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
@@ -348,6 +349,16 @@
   }
 
   /**
+   * Requests a resource from the cluster manager
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param faultDomains set of fault domains on which to schedule this resource
+   */
+  public final void requestResource(String processorId, String preferredHost, Set<FaultDomain> faultDomains) {
+    requestResourceWithDelay(processorId, preferredHost, Duration.ZERO, faultDomains);
+  }
+
+  /**
    * Requests a resource from the cluster manager with a request timestamp of the current time plus the specified delay.
    * @param processorId Samza processor ID that will be run when a resource is allocated for this request
    * @param preferredHost name of the host that you prefer to run the processor on
@@ -359,6 +370,18 @@
   }
 
   /**
+   * Requests a resource from the cluster manager with a request timestamp of the current time plus the specified delay.
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param delay the {@link Duration} to add to the request timestamp
+   * @param faultDomains set of fault domains on which to schedule this resource
+   */
+  public final void requestResourceWithDelay(String processorId, String preferredHost, Duration delay, Set<FaultDomain> faultDomains) {
+    SamzaResourceRequest request = getResourceRequestWithDelay(processorId, preferredHost, delay, faultDomains);
+    issueResourceRequest(request);
+  }
+
+  /**
    * Creates a {@link SamzaResourceRequest} to send to the cluster manager
    * @param processorId Samza processor ID that will be run when a resource is allocated for this request
    * @param preferredHost name of the host that you prefer to run the processor on
@@ -369,6 +392,17 @@
   }
 
   /**
+   * Creates a {@link SamzaResourceRequest} to send to the cluster manager
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param faultDomains set of fault domains on which to schedule this resource
+   * @return the created request
+   */
+  public final SamzaResourceRequest getResourceRequest(String processorId, String preferredHost, Set<FaultDomain> faultDomains) {
+    return getResourceRequestWithDelay(processorId, preferredHost, Duration.ZERO, faultDomains);
+  }
+
+  /**
    * Creates a {@link SamzaResourceRequest} to send to the cluster manager with a request timestamp of the current time
    * plus the specified delay.
    * @param processorId Samza processor ID that will be run when a resource is allocated for this request
@@ -380,6 +414,19 @@
     return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay));
   }
 
+  /**
+   * Creates a {@link SamzaResourceRequest} to send to the cluster manager with a request timestamp of the current time
+   * plus the specified delay.
+   * @param processorId Samza processor ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param delay the {@link Duration} to add to the request timestamp
+   * @param faultDomains set of fault domains on which to schedule this resource
+   * @return the created request
+   */
+  public final SamzaResourceRequest getResourceRequestWithDelay(String processorId, String preferredHost, Duration delay, Set<FaultDomain> faultDomains) {
+    return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay), faultDomains);
+  }
+
   public final void issueResourceRequest(SamzaResourceRequest request) {
     resourceRequestState.addResourceRequest(request);
     state.containerRequests.incrementAndGet();
@@ -388,6 +435,9 @@
     } else {
       state.preferredHostRequests.incrementAndGet();
     }
+    if (!request.getFaultDomains().isEmpty()) {
+      state.faultDomainAwareContainerRequests.incrementAndGet();
+    }
   }
 
   /**
@@ -480,5 +530,8 @@
     } else {
       state.expiredPreferredHostRequests.incrementAndGet();
     }
+    if (!request.getFaultDomains().isEmpty()) {
+      state.expiredFaultDomainAwareContainerRequests.incrementAndGet();
+    }
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 24130fd..5fcf328 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -28,6 +28,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
+import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.placement.ContainerPlacementMessage;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
@@ -88,8 +89,9 @@
 
   public ContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
       SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
+      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager, FaultDomainManager faultDomainManager, Config config) {
     Preconditions.checkNotNull(localityManager, "Locality manager cannot be null");
+    Preconditions.checkNotNull(faultDomainManager, "Fault domain manager cannot be null");
     this.samzaApplicationState = samzaApplicationState;
     this.clusterResourceManager = clusterResourceManager;
     this.actions = new ConcurrentHashMap<>();
@@ -100,7 +102,7 @@
     // Enable standby container manager if required
     if (standByEnabled) {
       this.standbyContainerManager =
-          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager, localityManager));
+          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager, localityManager, config, faultDomainManager));
     } else {
       this.standbyContainerManager = Optional.empty();
     }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 995cf7d..143e0b3 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -149,6 +149,9 @@
     ResourceManagerFactory factory = getContainerProcessManagerFactory(clusterManagerConfig);
     this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state));
 
+    FaultDomainManagerFactory faultDomainManagerFactory = getFaultDomainManagerFactory(clusterManagerConfig);
+    FaultDomainManager faultDomainManager = checkNotNull(faultDomainManagerFactory.getFaultDomainManager(config, registry));
+
     // Initialize metrics
     this.containerProcessManagerMetrics = new ContainerProcessManagerMetrics(config, state, registry);
     this.jvmMetrics = new JvmMetrics(registry);
@@ -172,8 +175,8 @@
     // Wire all metrics to all reporters
     this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry));
 
-    this.containerManager = new ContainerManager(metadataStore, state, clusterResourceManager, hostAffinityEnabled,
-        jobConfig.getStandbyTasksEnabled(), localityManager);
+    this.containerManager = new ContainerManager(metadataStore, state, clusterResourceManager,
+            hostAffinityEnabled, jobConfig.getStandbyTasksEnabled(), localityManager, faultDomainManager, config);
 
     this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager);
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
@@ -649,6 +652,26 @@
   }
 
   /**
+   * Returns an instantiated {@link FaultDomainManagerFactory} from a {@link ClusterManagerConfig}. The
+   * {@link FaultDomainManagerFactory} is used to return an implementation of a {@link FaultDomainManager}
+   *
+   * @param clusterManagerConfig, the cluster manager config to parse.
+   *
+   */
+  private FaultDomainManagerFactory getFaultDomainManagerFactory(final ClusterManagerConfig clusterManagerConfig) {
+    final String faultDomainManagerFactoryClass = clusterManagerConfig.getFaultDomainManagerClass();
+    final FaultDomainManagerFactory factory;
+
+    try {
+      factory = ReflectionUtil.getObj(faultDomainManagerFactoryClass, FaultDomainManagerFactory.class);
+    } catch (Exception e) {
+      LOG.error("Error creating the fault domain manager.", e);
+      throw new SamzaException(e);
+    }
+    return factory;
+  }
+
+  /**
    * Obtains the ID of the Samza processor pending launch on the provided resource (container).
    *
    * ContainerProcessManager [INFO] Container ID: container_e66_1569376389369_0221_01_000049 matched pending Processor ID: 0 on host: ltx1-app0772.stg.linkedin.com
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index b849ea5..a07a924 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -28,6 +29,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
@@ -56,8 +59,13 @@
   // Resource-manager, used to stop containers
   private ClusterResourceManager clusterResourceManager;
 
-  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
-      ClusterResourceManager clusterResourceManager, LocalityManager localityManager) {
+  // FaultDomainManager, used to get fault domain information of different hosts from the cluster manager.
+  private final FaultDomainManager faultDomainManager;
+
+  private final boolean isFaultDomainAwareStandbyEnabled;
+
+  public StandbyContainerManager(SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
+                                 LocalityManager localityManager, Config config, FaultDomainManager faultDomainManager) {
     this.failovers = new ConcurrentHashMap<>();
     this.localityManager = localityManager;
     this.standbyContainerConstraints = new HashMap<>();
@@ -70,6 +78,9 @@
         .forEach(containerId -> standbyContainerConstraints.put(containerId,
             StandbyTaskUtil.getStandbyContainerConstraints(containerId, jobModel)));
     this.clusterResourceManager = clusterResourceManager;
+    this.faultDomainManager = faultDomainManager;
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+    this.isFaultDomainAwareStandbyEnabled = clusterManagerConfig.getFaultDomainAwareStandbyEnabled();
 
     log.info("Populated standbyContainerConstraints map {}", standbyContainerConstraints);
   }
@@ -125,7 +136,9 @@
 
     if (StandbyTaskUtil.isStandbyContainer(containerID)) {
       log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", containerID);
-      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
     } else {
       initiateStandbyAwareAllocation(containerID, resourceID, containerAllocator);
     }
@@ -157,6 +170,22 @@
   }
 
   /**
+   * This method removes the fault domain of the host passed as an argument, from the set of fault domains, and then returns it.
+   * The set of fault domains returned is based on the set difference between all the available fault domains in the
+   * cluster and the fault domain associated with the host that is passed as input.
+   * @param hostToAvoid hostname whose fault domains are excluded
+   * @return The set of fault domains which excludes the fault domain that the given host is on
+   */
+  public Set<FaultDomain> getAllowedFaultDomainsGivenHostToAvoid(String hostToAvoid) {
+    Set<FaultDomain> allFaultDomains = faultDomainManager.getAllFaultDomains();
+    Set<FaultDomain> faultDomainToAvoid = Optional.ofNullable(hostToAvoid)
+            .map(faultDomainManager::getFaultDomainsForHost)
+            .orElse(Collections.emptySet());
+    allFaultDomains.removeAll(faultDomainToAvoid);
+    return allFaultDomains;
+  }
+
+  /**
    *  If a standby container has stopped, then there are two possible cases
    *    Case 1. during a failover, the standby container was stopped for an active's start, then we
    *       1. request a resource on the standby's host to place the activeContainer, and
@@ -181,24 +210,29 @@
 
       // request standbycontainer's host for active-container
       SamzaResourceRequest resourceRequestForActive =
-          containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
+        containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
       // record the resource request, before issuing it to avoid race with allocation-thread
       failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
       containerAllocator.issueResourceRequest(resourceRequestForActive);
 
       // request any-host for standby container
-      containerAllocator.requestResource(standbyContainerID, ResourceRequestState.ANY_HOST);
+      requestResource(containerAllocator, standbyContainerID, ResourceRequestState.ANY_HOST, Duration.ZERO, standbyContainerHostname);
     } else {
       log.info("Issuing request for standby container {} on host {}, since this is not for a failover",
           standbyContainerID, preferredHost);
-      containerAllocator.requestResourceWithDelay(standbyContainerID, preferredHost, preferredHostRetryDelay);
+      String activeContainerHost = getActiveContainerHost(standbyContainerID)
+              .orElse(null);
+      requestResource(containerAllocator, standbyContainerID, preferredHost, preferredHostRetryDelay, activeContainerHost);
     }
   }
 
   /** Method to handle standby-aware allocation for an active container.
    *  We try to find a standby host for the active container, and issue a stop on any standby-containers running on it,
    *  request resource to place the active on the standby's host, and one to place the standby elsewhere.
-   *
+   *  When requesting for resources,
+   *  NOTE: When rack awareness is turned on, we always pass the <code>hostToAvoid</> parameter as null for the {@link #requestResource} method used here
+   *  because the hostname of the previous active processor that died does not exist in the running or pending container list anymore.
+   *  However, different racks will always be guaranteed through {@link #checkStandbyConstraintsAndRunStreamProcessor}.
    * @param activeContainerID the samzaContainerID of the active-container
    * @param resourceID  the samza-resource-ID of the container when it failed (used to index failover-state)
    */
@@ -235,7 +269,7 @@
 
         // record the resource request, before issuing it to avoid race with allocation-thread
         SamzaResourceRequest resourceRequestForActive =
-            containerAllocator.getResourceRequest(activeContainerID, standbyHost);
+                containerAllocator.getResourceRequest(activeContainerID, standbyHost);
         failoverMetadata.recordResourceRequest(resourceRequestForActive);
         containerAllocator.issueResourceRequest(resourceRequestForActive);
         samzaApplicationState.failoversToStandby.incrementAndGet();
@@ -281,7 +315,7 @@
     Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(activeContainerResourceID);
 
     // Iterate over the list of running standby containers, to find a standby resource that we have not already
-    // used for a failover for this active resoruce
+    // used for a failover for this active resource
     for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) {
 
       if (samzaApplicationState.runningProcessors.containsKey(standbyContainerID)) {
@@ -307,7 +341,7 @@
               .map(ProcessorLocality::host)
               .orElse(null);
 
-      if (StringUtils.isNotBlank(standbyHost)) {
+      if (StringUtils.isBlank(standbyHost)) {
         log.info("No last known standbyHost for container {}", standbyContainerID);
       } else if (failoverMetadata.isPresent() && failoverMetadata.get().isStandbyHostUsed(standbyHost)) {
 
@@ -361,8 +395,44 @@
   }
 
   /**
-   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+   * This method checks from the config if standby allocation is fault domain aware or not, and requests resources accordingly.
    *
+   * @param containerAllocator ContainerAllocator object that requests for resources from the resource manager
+   * @param containerID Samza container ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   * @param preferredHostRetryDelay the {@link Duration} to add to the request timestamp
+   * @param hostToAvoid The hostname to avoid requesting this resource on if fault domain aware standby allocation is enabled
+   */
+  void requestResource(ContainerAllocator containerAllocator, String containerID, String preferredHost, Duration preferredHostRetryDelay, String hostToAvoid) {
+    if (StandbyTaskUtil.isStandbyContainer(containerID) && isFaultDomainAwareStandbyEnabled) {
+      containerAllocator.requestResourceWithDelay(containerID, preferredHost, preferredHostRetryDelay, getAllowedFaultDomainsGivenHostToAvoid(hostToAvoid));
+    } else {
+      containerAllocator.requestResourceWithDelay(containerID, preferredHost, preferredHostRetryDelay, new HashSet<>());
+    }
+  }
+
+  /**
+   * This method returns the active container host given a standby or active container ID.
+   *
+   * @param containerID Standby or active container container ID
+   * @return The active container host
+   */
+  Optional<String> getActiveContainerHost(String containerID) {
+    String activeContainerId = containerID;
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      activeContainerId = StandbyTaskUtil.getActiveContainerId(containerID);
+    }
+    SamzaResource resource = samzaApplicationState.pendingProcessors.get(activeContainerId);
+    if (resource == null) {
+      resource = samzaApplicationState.runningProcessors.get(activeContainerId);
+    }
+    return Optional.ofNullable(resource)
+            .map(SamzaResource::getHost);
+  }
+
+  /**
+   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+   * This includes the check that a standby and its active should not be on the same fault domain or the same host.
    * @param containerIdToStart logical id of the container to start
    * @param host potential host to start the container on
    * @return
@@ -375,17 +445,33 @@
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
+      if (resource != null && isFaultDomainAwareStandbyEnabled
+              && faultDomainManager.hasSameFaultDomains(host, resource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this fault domain",
+                containerIdToStart, host, containerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (resource != null && resource.getHost().equals(host)) {
         log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+                containerIdToStart, host, containerID);
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
+      if (resource != null && isFaultDomainAwareStandbyEnabled
+              && faultDomainManager.hasSameFaultDomains(host, resource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already running on this fault domain",
+                containerIdToStart, host, containerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (resource != null && resource.getHost().equals(host)) {
         log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+                containerIdToStart, host, containerID);
         return false;
       }
     }
@@ -409,16 +495,21 @@
       log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID,
           samzaResource.getHost(), preferredHost);
       containerAllocator.runStreamProcessor(request, preferredHost);
+      if (isFaultDomainAwareStandbyEnabled && StandbyTaskUtil.isStandbyContainer(containerID)) {
+        samzaApplicationState.faultDomainAwareContainersStarted.incrementAndGet();
+      }
     } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
       // This resource cannot be used to launch this standby container, so we make a new anyhost request
       log.info(
           "Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request",
           containerID, samzaResource.getHost());
       releaseUnstartableContainer(request, samzaResource, preferredHost, resourceRequestState);
-      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
       samzaApplicationState.failedStandbyAllocations.incrementAndGet();
     } else {
-      // This resource cannot be used to launch this active container container, so we initiate a failover
+      // This resource cannot be used to launch this active container, so we initiate a failover
       log.warn(
           "Running active container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource",
           containerID, samzaResource.getHost());
@@ -469,7 +560,6 @@
     resourceRequestState.cancelResourceRequest(request);
   }
 
-
   // Handle an expired resource request that was made for placing a standby container
   private void handleExpiredRequestForStandbyContainer(String containerID, SamzaResourceRequest request,
       Optional<SamzaResource> alternativeResource, ContainerAllocator containerAllocator,
@@ -486,7 +576,9 @@
       // If there is no alternative-resource for the standby container we make a new anyhost request
       log.info("Handling expired request, requesting anyHost resource for standby container {}", containerID);
       resourceRequestState.cancelResourceRequest(request);
-      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
     }
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 50a1ee1..e0b0739 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -88,6 +88,7 @@
     configMap.put("task.inputs", "kafka.topic1");
     configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.MockSystemFactory");
     configMap.put("samza.cluster-manager.factory", "org.apache.samza.clustermanager.MockClusterResourceManagerFactory");
+    configMap.put("cluster-manager.fault-domain-manager.factory", "org.apache.samza.clustermanager.MockFaultDomainManagerFactory");
     configMap.put("job.coordinator.monitor-partition-change.frequency.ms", "1");
 
     MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(0)), new ArrayList<>());
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 2b4a4b0..2c9ba81 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -64,6 +64,7 @@
   private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
 
   private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+  private final FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
   private ContainerPlacementMetadataStore containerPlacementMetadataStore;
   private ContainerManager containerManager;
 
@@ -89,7 +90,7 @@
     coordinatorStreamStore.init();
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
-    containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
+    containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager, faultDomainManager, config);
     containerAllocator =
         new ContainerAllocator(clusterResourceManager, config, state, true, containerManager);
     requestState = new MockContainerRequestState(clusterResourceManager, true);
@@ -369,7 +370,7 @@
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
     ClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class));
+        new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class), faultDomainManager, config);
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
       SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
@@ -416,7 +417,7 @@
   public void testExpiredRequestAllocationOnAnyHost() throws Exception {
     MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false, mock(LocalityManager.class), faultDomainManager, config));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyManager, config, state, true, spyContainerManager));
     // Request Preferred Resources
@@ -460,7 +461,7 @@
     // Add Extra Resources
     MockClusterResourceManager spyClusterResourceManager = spy(new MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false, mock(LocalityManager.class), faultDomainManager, config));
 
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyClusterResourceManager, config, state, true, spyContainerManager));
@@ -513,7 +514,7 @@
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
     MockClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class), faultDomainManager, config));
 
     SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, "host-0", "id0",
         System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index ac5d6f3..1f063d7 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -62,6 +62,7 @@
 
   private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
   private final MockClusterResourceManager manager = new MockClusterResourceManager(callback, state);
+  private final FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
 
   private CoordinatorStreamStore coordinatorStreamStore;
   private ContainerPlacementMetadataStore containerPlacementMetadataStore;
@@ -83,7 +84,7 @@
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
     containerAllocator = new ContainerAllocator(manager, config, state, false,
-        new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager));
+        new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager, faultDomainManager, config));
     requestState = new MockContainerRequestState(manager, false);
     Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -179,9 +180,11 @@
     });
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    ContainerManager containerManager = new ContainerManager(containerPlacementMetadataStore, state, manager, false,
+        false, mockLocalityManager, faultDomainManager, config);
     containerAllocator =
         MockContainerAllocatorWithoutHostAffinity.createContainerAllocatorWithConfigOverride(manager, config, state,
-            new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager),
+            containerManager,
             override);
     MockContainerAllocatorWithoutHostAffinity mockAllocator =
         (MockContainerAllocatorWithoutHostAffinity) containerAllocator;
@@ -331,7 +334,7 @@
     ClusterResourceManager.Callback mockCPM = mock(ClusterResourceManager.Callback.class);
     ClusterResourceManager mockManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, mockManager, false, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, mockManager, false, false, mock(LocalityManager.class), faultDomainManager, config));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(mockManager, config, state, false, spyContainerManager));
     // Mock the callback from ClusterManager to add resources to the allocator
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index c781f4d..e5ead9e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -141,13 +141,14 @@
     state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     localityManager = mock(LocalityManager.class);
     when(localityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of(
             "0", new ProcessorLocality("0", "host-1"),
             "1", new ProcessorLocality("1", "host-2"))));
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager, faultDomainManager, config));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
             clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager, false);
@@ -171,9 +172,10 @@
     state = new SamzaApplicationState(getJobModelManagerWithStandby());
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     // Enable standby
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, true, mockLocalityManager));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, true, mockLocalityManager, faultDomainManager, config));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
         clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, mockLocalityManager, false);
@@ -552,8 +554,9 @@
     state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager, faultDomainManager, config));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
         clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager, false);
@@ -666,8 +669,9 @@
         new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, false, false, localityManager);
+        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, false, false, localityManager, faultDomainManager, config);
     MockContainerAllocatorWithoutHostAffinity allocatorWithoutHostAffinity =
         new MockContainerAllocatorWithoutHostAffinity(clusterResourceManager, new MapConfig(conf), state,
             containerManager);
@@ -801,8 +805,9 @@
         new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ContainerManager containerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager, faultDomainManager, config));
     MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
         new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     ContainerProcessManager cpm = new ContainerProcessManager(
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index bcbe53f..ad45c5e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -141,11 +141,12 @@
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"))));
     ContainerManager containerManager =
-        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager, faultDomainManager);
     ContainerProcessManager cpm =
         buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());
 
@@ -592,6 +593,7 @@
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
 
     if (withHostAffinity) {
@@ -604,7 +606,7 @@
 
     ContainerManager containerManager =
         buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-            clusterManagerConfig.getHostAffinityEnabled(), false, mockLocalityManager);
+            clusterManagerConfig.getHostAffinityEnabled(), false, mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -614,7 +616,7 @@
 
     ContainerProcessManager cpm =
         buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator),
-            mockLocalityManager, false);
+            mockLocalityManager, false, faultDomainManager);
 
     // start triggers a request
     cpm.start();
@@ -755,11 +757,12 @@
     configMap.putAll(getConfig());
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("1", "host1"))));
     ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager);
+            Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -792,11 +795,12 @@
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(2));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"), "1", new ProcessorLocality("1", "host2"))));
     ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager);
+            Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithHostAffinity allocator = new MockContainerAllocatorWithHostAffinity(
         clusterResourceManager,
@@ -806,7 +810,7 @@
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager,
-            Optional.of(allocator), mockLocalityManager, false));
+            Optional.of(allocator), mockLocalityManager, false, faultDomainManager));
 
     cpm.start();
     assertFalse(cpm.shouldShutdown());
@@ -1031,16 +1035,16 @@
       SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
       boolean hostAffinityEnabled, boolean standByEnabled) {
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
     return buildContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager,
-        hostAffinityEnabled, standByEnabled, mockLocalityManager);
+        hostAffinityEnabled, standByEnabled, mockLocalityManager, faultDomainManager);
   }
 
   private ContainerManager buildContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
-      SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
-    return new ContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager,
-        hostAffinityEnabled, standByEnabled, localityManager);
+      SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager, boolean hostAffinityEnabled,
+      boolean standByEnabled, LocalityManager localityManager, FaultDomainManager faultDomainManager) {
+    return new ContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager, hostAffinityEnabled, standByEnabled, localityManager, faultDomainManager, config);
   }
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator) {
@@ -1050,16 +1054,17 @@
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator, boolean restartContainer) {
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
     return buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, allocator,
-        mockLocalityManager, restartContainer);
+        mockLocalityManager, restartContainer, faultDomainManager);
   }
 
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator, LocalityManager localityManager,
-      boolean restartContainers) {
+      boolean restartContainers, FaultDomainManager faultDomainManager) {
     return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager,
         allocator, buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        clusterManagerConfig.getHostAffinityEnabled(), false, localityManager), localityManager, restartContainers);
+        clusterManagerConfig.getHostAffinityEnabled(), false, localityManager, faultDomainManager), localityManager, restartContainers);
   }
 }
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index fa784e0..ccdd00f 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -241,6 +241,7 @@
     String processorId = resourceRequest.getProcessorId();
     String requestId = resourceRequest.getRequestId();
     String preferredHost = resourceRequest.getPreferredHost();
+    String[] racks = resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
     int memoryMb = resourceRequest.getMemoryMB();
     int cpuCores = resourceRequest.getNumCores();
     Resource capability = Resource.newInstance(memoryMb, cpuCores);
@@ -261,15 +262,15 @@
       Priority priority = Priority.newInstance(ANY_HOST_PRIORITY);
       boolean relaxLocality = true;
       log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
-          processorId, null, null, capability, priority, relaxLocality, nodeLabelsExpression);
+          processorId, null, Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
       issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, relaxLocality, nodeLabelsExpression);
     } else {
       String[] nodes = {preferredHost};
       Priority priority = Priority.newInstance(PREFERRED_HOST_PRIORITY);
       boolean relaxLocality = false;
       log.info("Requesting resources for Processor ID: {} on nodes: {} on racks: {} with capability: {}, priority: {}, relaxLocality: {}, nodeLabelsExpression: {}",
-          processorId, Arrays.toString(nodes), null, capability, priority, relaxLocality, nodeLabelsExpression);
-      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, null, priority, relaxLocality, nodeLabelsExpression);
+          processorId, Arrays.toString(nodes), Arrays.toString(racks), capability, priority, relaxLocality, nodeLabelsExpression);
+      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, racks, priority, relaxLocality, nodeLabelsExpression);
     }
     // ensure that updating the state and making the request are done atomically.
     synchronized (lock) {