SAMZA-2439: Remove LocalityManager and container location information from JobModel (#1421)

Issues
Currently locality information is part of job model. Job model typically is immutable and fixed within the lifecycle of an application attempt. The locality information on the other hand is dynamic and changes in the event of container movements. Due to this difference, it makes it complicated to program, model or define semantics around these models when building features. Furthermore, by removing this dependency

- Enables us to move JobModel to public APIs and expose it in JobContext
- Enables us to cache and serve serialized JobModel from the AM servlet to reduce AM overhead (memory, open connections, num threads) during container startup, esp. for jobs with a large number of containers (See: #1241)
- Removes tech debt: models should be immutable, and should not update themselves.
- Removes tech debt: makes current container location a first class concept for container scheduling / placement , and for tools like dashboard, samza-rest, auto-scaling, diagnostics etc.

Changes
- Separated out locality information out of job model into LocalityModel
- Introduced an endpoint in AM to serve locality information
- Added Json MixIns for locality models (LocalityModel & ContainerLocality)
- Moved JobModel to samza-api and exposed through JobContext

API Changes:
- Introduced new models for locality.
- Previous job model endpoint will no longer serve locality information. i.e. tools using these will need to update to use the new endpoint.
- Expose JobModel via JobContext
diff --git a/samza-api/src/main/java/org/apache/samza/context/JobContext.java b/samza-api/src/main/java/org/apache/samza/context/JobContext.java
index 8e41980..7166446 100644
--- a/samza-api/src/main/java/org/apache/samza/context/JobContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/JobContext.java
@@ -19,6 +19,7 @@
 package org.apache.samza.context;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.job.model.JobModel;
 
 
 /**
@@ -46,4 +47,9 @@
    * @return the id for this job
    */
   String getJobId();
+
+  /**
+   * @return the {@link JobModel} for the job
+   */
+  JobModel getJobModel();
 }
diff --git a/samza-api/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-api/src/main/java/org/apache/samza/job/model/JobModel.java
new file mode 100644
index 0000000..d1f5e72
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.samza.config.Config;
+
+/**
+ * <p>
+ * The data model used to represent a Samza job. The model is used in the job
+ * coordinator and SamzaContainer to determine how to execute Samza jobs.
+ * </p>
+ *
+ * <p>
+ * The hierarchy for a Samza's job data model is that jobs have containers, and
+ * containers have tasks. Each data model contains relevant information, such as
+ * an id, partition information, etc.
+ * </p>
+ */
+public class JobModel {
+  private final Config config;
+  private final Map<String, ContainerModel> containers;
+
+  public int maxChangeLogStreamPartitions;
+
+  public JobModel(Config config, Map<String, ContainerModel> containers) {
+    this.config = config;
+    this.containers = Collections.unmodifiableMap(containers);
+
+    // Compute the number of change log stream partitions as the maximum partition-id
+    // of all total number of tasks of the job; Increment by 1 because partition ids
+    // start from 0 while we need the absolute count.
+    this.maxChangeLogStreamPartitions = 0;
+    for (ContainerModel container: containers.values()) {
+      for (TaskModel task: container.getTasks().values()) {
+        if (this.maxChangeLogStreamPartitions < task.getChangelogPartition().getPartitionId() + 1)
+          this.maxChangeLogStreamPartitions = task.getChangelogPartition().getPartitionId() + 1;
+      }
+    }
+  }
+
+  public Config getConfig() {
+    return config;
+  }
+
+  public Map<String, ContainerModel> getContainers() {
+    return containers;
+  }
+
+  @Override
+  public String toString() {
+    return "JobModel [config=" + config + ", containers=" + containers + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((config == null) ? 0 : config.hashCode());
+    result = prime * result + ((containers == null) ? 0 : containers.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    JobModel other = (JobModel) obj;
+    if (config == null) {
+      if (other.config != null)
+        return false;
+    } else if (!config.equals(other.config))
+      return false;
+    if (containers == null) {
+      if (other.containers != null)
+        return false;
+    } else if (!containers.equals(other.containers))
+      return false;
+    return true;
+  }
+}
diff --git a/samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java b/samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
new file mode 100644
index 0000000..7775434
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. This locality information is used
+ * to place the container, if possible, on the same host that it was last seen. By doing this, stateful applications
+ * can minimize the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ProcessorLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ *
+ * In YARN deployment model, processors are interchangeably used for container and <i>processorId</i>refers to
+ * logical container id.
+ */
+public class LocalityModel {
+  /*
+   * A collection of processor locality keyed by processorId.
+   */
+  private final Map<String, ProcessorLocality> processorLocalities;
+
+  /**
+   * Construct locality model for the job from the input map of processor localities.
+   * @param processorLocalities host locality information for the job keyed by processor id
+   */
+  public LocalityModel(Map<String, ProcessorLocality> processorLocalities) {
+    this.processorLocalities = ImmutableMap.copyOf(processorLocalities);
+  }
+
+  /*
+   * Returns a {@link Map} of {@link ProcessorLocality} keyed by processors id.
+   */
+  public Map<String, ProcessorLocality> getProcessorLocalities() {
+    return processorLocalities;
+  }
+
+  /*
+   * Returns the {@link ProcessorLocality} for the given container processorId.
+   */
+  public ProcessorLocality getProcessorLocality(String processorId) {
+    return processorLocalities.get(processorId);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof LocalityModel)) {
+      return false;
+    }
+    LocalityModel that = (LocalityModel) o;
+    return Objects.deepEquals(processorLocalities, that.processorLocalities);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(processorLocalities);
+  }
+}
diff --git a/samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java b/samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java
new file mode 100644
index 0000000..3478568
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+
+/**
+ * A data model to represent the processor locality information. The locality information refers to the whereabouts
+ * of the physical execution of container.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced within the framework through the locality
+ * manager.
+ */
+public class ProcessorLocality {
+  /* Processor identifier. In YARN deployment model, this corresponds to the logical container id */
+  private final String id;
+  /* Host on which the processor is currently placed */
+  private final String host;
+  private final String jmxUrl;
+  /* JMX tunneling URL for debugging */
+  private final String jmxTunnelingUrl;
+
+  public ProcessorLocality(String id, String host) {
+    this(id, host, "", "");
+  }
+
+  public ProcessorLocality(String id, String host, String jmxUrl, String jmxTunnelingUrl) {
+    this.id = id;
+    this.host = host;
+    this.jmxUrl = jmxUrl;
+    this.jmxTunnelingUrl = jmxTunnelingUrl;
+  }
+
+  public String id() {
+    return id;
+  }
+
+  public String host() {
+    return host;
+  }
+
+  public String jmxUrl() {
+    return jmxUrl;
+  }
+
+  public String jmxTunnelingUrl() {
+    return jmxTunnelingUrl;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ProcessorLocality that = (ProcessorLocality) o;
+    return Objects.equals(id, that.id)
+        && Objects.equals(host, that.host)
+        && Objects.equals(jmxUrl, that.jmxUrl)
+        && Objects.equals(jmxTunnelingUrl, that.jmxTunnelingUrl);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id, host, jmxUrl, jmxTunnelingUrl);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 8482a3b..68e2f77 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -44,6 +44,7 @@
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.InputStreamsDiscoveredException;
 import org.apache.samza.coordinator.JobModelManager;
@@ -54,6 +55,7 @@
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.JobModelUtil;
@@ -174,6 +176,7 @@
   private final MetadataStore metadataStore;
 
   private final SystemAdmins systemAdmins;
+  private final LocalityManager localityManager;
 
   /**
    * Internal variable for the instance of {@link JmxServer}
@@ -215,6 +218,8 @@
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     this.isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
     this.jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval();
+    this.localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
 
     // build metastore for container placement messages
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(metadataStore);
@@ -343,6 +348,7 @@
       systemAdmins.stop();
       shutDowncontainerPlacementRequestAllocatorAndUtils();
       containerProcessManager.stop();
+      localityManager.close();
       metadataStore.close();
     } catch (Throwable e) {
       LOG.error("Exception while stopping cluster based job coordinator", e);
@@ -457,7 +463,7 @@
 
   @VisibleForTesting
   ContainerProcessManager createContainerProcessManager() {
-    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore);
+    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager);
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 2730c0c..24130fd 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.UUID;
@@ -27,10 +28,11 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.placement.ContainerPlacementMessage;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
 import org.apache.samza.container.placement.ContainerPlacementResponseMessage;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.util.BoundedLinkedHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,19 +84,23 @@
 
   private final Optional<StandbyContainerManager> standbyContainerManager;
 
+  private final LocalityManager localityManager;
+
   public ContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
       SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled) {
+      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
+    Preconditions.checkNotNull(localityManager, "Locality manager cannot be null");
     this.samzaApplicationState = samzaApplicationState;
     this.clusterResourceManager = clusterResourceManager;
     this.actions = new ConcurrentHashMap<>();
     this.placementRequestsCache = new BoundedLinkedHashSet<UUID>(UUID_CACHE_SIZE);
     this.hostAffinityEnabled = hostAffinityEnabled;
     this.containerPlacementMetadataStore = containerPlacementMetadataStore;
+    this.localityManager = localityManager;
     // Enable standby container manager if required
     if (standByEnabled) {
       this.standbyContainerManager =
-          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager));
+          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager, localityManager));
     } else {
       this.standbyContainerManager = Optional.empty();
     }
@@ -529,7 +535,9 @@
           processorId, currentResource.getContainerId(), currentResource.getHost(), requestMessage);
       sourceHost = currentResource.getHost();
     } else {
-      sourceHost = samzaApplicationState.jobModelManager.jobModel().getContainerToHostValue(processorId, SetContainerHostMapping.HOST_KEY);
+      sourceHost = Optional.ofNullable(localityManager.readLocality().getProcessorLocality(processorId))
+          .map(ProcessorLocality::host)
+          .orElse(null);
       LOG.info("Processor ID: {} is not running and was last seen on host: {} for ContainerPlacement action: {}",
           processorId, sourceHost, requestMessage);
     }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index f6e3b1f..1bc1669 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
@@ -26,6 +27,7 @@
 import java.util.Map;
 import java.util.Optional;
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
@@ -33,9 +35,11 @@
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.diagnostics.DiagnosticsManager;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metrics.ContainerProcessManagerMetrics;
 import org.apache.samza.metrics.JvmMetrics;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -103,6 +107,8 @@
 
   private final Option<DiagnosticsManager> diagnosticsManager;
 
+  private final LocalityManager localityManager;
+
   /**
    * A standard interface to request resources.
    */
@@ -130,7 +136,8 @@
   private Map<String, MetricsReporter> metricsReporters;
 
   public ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry,
-      ContainerPlacementMetadataStore metadataStore) {
+      ContainerPlacementMetadataStore metadataStore, LocalityManager localityManager) {
+    Preconditions.checkNotNull(localityManager, "Locality manager cannot be null");
     this.state = state;
     this.clusterManagerConfig = new ClusterManagerConfig(config);
     this.jobConfig = new JobConfig(config);
@@ -159,11 +166,12 @@
       diagnosticsManager = Option.empty();
     }
 
+    this.localityManager = localityManager;
     // Wire all metrics to all reporters
     this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry));
 
     this.containerManager = new ContainerManager(metadataStore, state, clusterResourceManager, hostAffinityEnabled,
-        jobConfig.getStandbyTasksEnabled());
+        jobConfig.getStandbyTasksEnabled(), localityManager);
 
     this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager);
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
@@ -176,7 +184,8 @@
       MetricsRegistryMap registry,
       ClusterResourceManager resourceManager,
       Optional<ContainerAllocator> allocator,
-      ContainerManager containerManager) {
+      ContainerManager containerManager,
+      LocalityManager localityManager) {
     this.state = state;
     this.clusterManagerConfig = clusterManagerConfig;
     this.jobConfig = new JobConfig(clusterManagerConfig);
@@ -186,6 +195,7 @@
     this.clusterResourceManager = resourceManager;
     this.containerManager = containerManager;
     this.diagnosticsManager = Option.empty();
+    this.localityManager = localityManager;
     this.containerAllocator = allocator.orElseGet(
       () -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state,
           hostAffinityEnabled, this.containerManager));
@@ -233,8 +243,16 @@
     state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
 
     // Request initial set of containers
-    Map<String, String> processorToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
-    containerAllocator.requestResources(processorToHostMapping);
+    LocalityModel localityModel = localityManager.readLocality();
+    Map<String, String> processorToHost = new HashMap<>();
+    state.jobModelManager.jobModel().getContainers().keySet().forEach((containerId) -> {
+      String host = Optional.ofNullable(localityModel.getProcessorLocality(containerId))
+          .map(ProcessorLocality::host)
+          .filter(StringUtils::isNotBlank)
+          .orElse(null);
+      processorToHost.put(containerId, host);
+    });
+    containerAllocator.requestResources(processorToHost);
 
     // Start container allocator thread
     LOG.info("Starting the container allocator thread");
@@ -476,8 +494,10 @@
 
     state.neededProcessors.incrementAndGet();
     // Find out previously running container location
-    String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(processorId, SetContainerHostMapping.HOST_KEY);
-    if (!hostAffinityEnabled || lastSeenOn == null) {
+    String lastSeenOn = Optional.ofNullable(localityManager.readLocality().getProcessorLocality(processorId))
+        .map(ProcessorLocality::host)
+        .orElse(null);
+    if (!hostAffinityEnabled || StringUtils.isBlank(lastSeenOn)) {
       lastSeenOn = ResourceRequestState.ANY_HOST;
     }
     LOG.info("Container ID: {} for Processor ID: {} was last seen on host {}.", containerId, processorId, lastSeenOn);
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index 30d0de9..b849ea5 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -26,8 +26,10 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +44,8 @@
 
   private final SamzaApplicationState samzaApplicationState;
 
+  private final LocalityManager localityManager;
+
   // Map of samza containerIDs to their corresponding active and standby containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1}
   // This is used for checking no two standbys or active-standby-pair are started on the same host
   private final Map<String, List<String>> standbyContainerConstraints;
@@ -53,8 +57,9 @@
   private ClusterResourceManager clusterResourceManager;
 
   public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
-      ClusterResourceManager clusterResourceManager) {
+      ClusterResourceManager clusterResourceManager, LocalityManager localityManager) {
     this.failovers = new ConcurrentHashMap<>();
+    this.localityManager = localityManager;
     this.standbyContainerConstraints = new HashMap<>();
     this.samzaApplicationState = samzaApplicationState;
     JobModel jobModel = samzaApplicationState.jobModelManager.jobModel();
@@ -297,12 +302,13 @@
     // We iterate over the list of last-known standbyHosts to check if anyone of them has not already been tried
     for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) {
 
-      String standbyHost = this.samzaApplicationState.jobModelManager.jobModel().
-          getContainerToHostValue(standbyContainerID, SetContainerHostMapping.HOST_KEY);
+      String standbyHost =
+          Optional.ofNullable(localityManager.readLocality().getProcessorLocality(standbyContainerID))
+              .map(ProcessorLocality::host)
+              .orElse(null);
 
-      if (standbyHost == null || standbyHost.isEmpty()) {
+      if (StringUtils.isNotBlank(standbyHost)) {
         log.info("No last known standbyHost for container {}", standbyContainerID);
-
       } else if (failoverMetadata.isPresent() && failoverMetadata.get().isStandbyHostUsed(standbyHost)) {
 
         log.info("Not using standby host {} for active container {} because it had already been selected", standbyHost,
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 34baad0..6f6951f 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -19,18 +19,22 @@
 
 package org.apache.samza.container;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.serializers.Serde;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Used for persisting and reading the container-to-host assignment information into the metadata store.
+ * Used for persisting and reading the locality information into the metadata store. Currently, we store the
+ * processor-to-host assignment.
  * */
 public class LocalityManager {
   private static final Logger LOG = LoggerFactory.getLogger(LocalityManager.class);
@@ -53,47 +57,46 @@
   }
 
   /**
-   * Method to allow read container locality information from the {@link MetadataStore}.
-   * This method is used in {@link org.apache.samza.coordinator.JobModelManager}.
+   * Fetch the processor locality information from the {@link MetadataStore}. In YARN deployment model, the
+   * processor refers to the samza container.
    *
-   * @return the map of containerId: (hostname)
+   * @return the {@code LocalityModel} for the job
    */
-  public Map<String, Map<String, String>> readContainerLocality() {
-    Map<String, Map<String, String>> allMappings = new HashMap<>();
-    metadataStore.all().forEach((containerId, valueBytes) -> {
+  public LocalityModel readLocality() {
+    Map<String, ProcessorLocality> containerLocalityMap = new HashMap<>();
+
+    metadataStore.all().forEach((processorId, valueBytes) -> {
       if (valueBytes != null) {
         String locationId = valueSerde.fromBytes(valueBytes);
-        Map<String, String> values = new HashMap<>();
-        values.put(SetContainerHostMapping.HOST_KEY, locationId);
-        allMappings.put(containerId, values);
+        containerLocalityMap.put(processorId, new ProcessorLocality(processorId, locationId));
       }
     });
     if (LOG.isDebugEnabled()) {
-      for (Map.Entry<String, Map<String, String>> entry : allMappings.entrySet()) {
-        LOG.debug(String.format("Locality for container %s: %s", entry.getKey(), entry.getValue()));
+      for (Map.Entry<String, ProcessorLocality> entry : containerLocalityMap.entrySet()) {
+        LOG.debug(String.format("Locality for container %s: %s", entry.getKey(), entry.getValue().host()));
       }
     }
 
-    return Collections.unmodifiableMap(allMappings);
+    return new LocalityModel(containerLocalityMap);
   }
 
   /**
    * Method to write locality information to the {@link MetadataStore}. This method is used in {@link SamzaContainer}.
    *
-   * @param containerId  the {@link SamzaContainer} ID
+   * @param processorId a.k.a logical container ID
    * @param hostName  the hostname
    */
-  public void writeContainerToHostMapping(String containerId, String hostName) {
-    Map<String, Map<String, String>> containerToHostMapping = readContainerLocality();
-    Map<String, String> existingMappings = containerToHostMapping.get(containerId);
-    String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
-    if (existingHostMapping != null && !existingHostMapping.equals(hostName)) {
-      LOG.info("Container {} moved from {} to {}", containerId, existingHostMapping, hostName);
+  public void writeContainerToHostMapping(String processorId, String hostName) {
+    String existingHostMapping = Optional.ofNullable(readLocality().getProcessorLocality(processorId))
+        .map(ProcessorLocality::host)
+        .orElse(null);
+    if (StringUtils.isNotBlank(existingHostMapping) && !existingHostMapping.equals(hostName)) {
+      LOG.info("Container {} moved from {} to {}", processorId, existingHostMapping, hostName);
     } else {
-      LOG.info("Container {} started at {}", containerId, hostName);
+      LOG.info("Container {} started at {}", processorId, hostName);
     }
 
-    metadataStore.put(containerId, valueSerde.toBytes(hostName));
+    metadataStore.put(processorId, valueSerde.toBytes(hostName));
     metadataStore.flush();
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
index ee6b492..4199d39 100644
--- a/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
@@ -20,17 +20,20 @@
 
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.job.model.JobModel;
 
 
 public class JobContextImpl implements JobContext {
   private final Config config;
+  private final JobModel jobModel;
   private final String jobName;
   private final String jobId;
 
-  private JobContextImpl(Config config, String jobName, String jobId) {
+  private JobContextImpl(Config config, String jobName, String jobId, JobModel jobModel) {
     this.config = config;
     this.jobName = jobName;
     this.jobId = jobId;
+    this.jobModel = jobModel;
   }
 
   /**
@@ -38,15 +41,16 @@
    * This extracts some information like job name and job id.
    *
    * @param config used to extract job information
+   * @param jobModel job model
    * @return {@link JobContextImpl} corresponding to the {@code config}
    * @throws IllegalArgumentException if job name is not defined in the {@code config}
    */
-  public static JobContextImpl fromConfigWithDefaults(Config config) {
+  public static JobContextImpl fromConfigWithDefaults(Config config, JobModel jobModel) {
     JobConfig jobConfig = new JobConfig(config);
     String jobName = jobConfig.getName()
         .orElseThrow(() -> new IllegalArgumentException(String.format("Config %s is missing", JobConfig.JOB_NAME)));
     String jobId = jobConfig.getJobId();
-    return new JobContextImpl(config, jobName, jobId);
+    return new JobContextImpl(config, jobName, jobId, jobModel);
   }
 
   @Override
@@ -63,4 +67,9 @@
   public String getJobId() {
     return this.jobId;
   }
+
+  @Override
+  public JobModel getJobModel() {
+    return this.jobModel;
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java b/samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
new file mode 100644
index 0000000..24e12ff
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.server;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+/**
+ * A servlet for locality information of a job. The servlet is hosted alongside of the {@link JobServlet} which hosts
+ * job model and configuration. Historically, locality information was part of job model but we extracted the locality
+ * as job model is static within the lifecycle of an application attempt while locality changes in the event of container
+ * movements. The locality information is served under
+ * {@link org.apache.samza.coordinator.JobModelManager#server()}/locality. The server and the port information are
+ * dynamic and is determined at the start AM. YARN dashboard or job-coordinator logs contains the server
+ * and the port information.
+ *
+ * This separation enables us to achieve performance benefits by caching job model when it is served by the AM as it
+ * can incur significant penalty in the job start time for jobs with large number of containers.
+ */
+public class LocalityServlet extends HttpServlet {
+  private static final String PROCESSOR_ID_PARAM = "processorId";
+  private final ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  private final LocalityManager localityManager;
+
+  public LocalityServlet(LocalityManager localityManager) {
+    this.localityManager = localityManager;
+  }
+
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    LocalityModel localityModel = localityManager.readLocality();
+
+    if (request.getParameterMap().size() == 1) {
+      String processorId = request.getParameter(PROCESSOR_ID_PARAM);
+      ProcessorLocality processorLocality = Optional.ofNullable(localityModel.getProcessorLocality(processorId))
+          .orElse(new ProcessorLocality(processorId, ""));
+      mapper.writeValue(response.getWriter(), processorLocality);
+    } else {
+      mapper.writeValue(response.getWriter(), localityModel);
+    }
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
index f55f02f..79b52e6 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -97,7 +97,6 @@
     }
 
     // 2. create the necessary streams
-    // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391
     // TODO: this works for single-job applications. For multi-job applications, ExecutionPlan should return an AppConfig
     // to be used for the whole application
     JobConfig jobConfig = jobConfigs.get(0);
diff --git a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
index c51fd85..9dd85b5 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
@@ -48,7 +48,6 @@
   @Override
   public List<JobConfig> prepareJobs() {
     // for high-level DAG, generate the plan and job configs
-    // TODO: run.id needs to be set for standalone: SAMZA-1531
     // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision
     String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
     LOG.info("The run id for this run is {}", runId);
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
deleted file mode 100644
index be26f10..0000000
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.model;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-
-/**
- * <p>
- * The data model used to represent a Samza job. The model is used in the job
- * coordinator and SamzaContainer to determine how to execute Samza jobs.
- * </p>
- *
- * <p>
- * The hierarchy for a Samza's job data model is that jobs have containers, and
- * containers have tasks. Each data model contains relevant information, such as
- * an id, partition information, etc.
- * </p>
- */
-public class JobModel {
-  private static final String EMPTY_STRING = "";
-  private final Config config;
-  private final Map<String, ContainerModel> containers;
-
-  private final LocalityManager localityManager;
-  private final Map<String, String> localityMappings;
-
-  public int maxChangeLogStreamPartitions;
-
-  public JobModel(Config config, Map<String, ContainerModel> containers) {
-    this(config, containers, null);
-  }
-
-  public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {
-    this.config = config;
-    this.containers = Collections.unmodifiableMap(containers);
-    this.localityManager = localityManager;
-
-    // initialize container localityMappings
-    this.localityMappings = new HashMap<>();
-    if (localityManager == null) {
-      for (String containerId : containers.keySet()) {
-        localityMappings.put(containerId, null);
-      }
-    } else {
-      populateContainerLocalityMappings();
-    }
-
-
-    // Compute the number of change log stream partitions as the maximum partition-id
-    // of all total number of tasks of the job; Increment by 1 because partition ids
-    // start from 0 while we need the absolute count.
-    this.maxChangeLogStreamPartitions = 0;
-    for (ContainerModel container: containers.values()) {
-      for (TaskModel task: container.getTasks().values()) {
-        if (this.maxChangeLogStreamPartitions < task.getChangelogPartition().getPartitionId() + 1)
-          this.maxChangeLogStreamPartitions = task.getChangelogPartition().getPartitionId() + 1;
-      }
-    }
-  }
-
-  public Config getConfig() {
-    return config;
-  }
-
-  /**
-   * Returns the container to host mapping for a given container ID and mapping key
-   *
-   * @param containerId the ID of the container
-   * @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.messages.SetContainerHostMapping}
-   * @return the value if it exists for a given container and key, otherwise an empty string
-   */
-  public String getContainerToHostValue(String containerId, String key) {
-    if (localityManager == null) {
-      return EMPTY_STRING;
-    }
-    final Map<String, String> mappings = localityManager.readContainerLocality().get(containerId);
-    if (mappings == null) {
-      return EMPTY_STRING;
-    }
-    if (!mappings.containsKey(key)) {
-      return EMPTY_STRING;
-    }
-    return mappings.get(key);
-  }
-
-  public Map<String, String> getAllContainerToHostValues(String key) {
-    if (localityManager == null) {
-      return Collections.EMPTY_MAP;
-    }
-    Map<String, String> allValues = new HashMap<>();
-    for (Map.Entry<String, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
-      String value = entry.getValue().get(key);
-      if (value != null) {
-        allValues.put(entry.getKey(), value);
-      }
-    }
-    return allValues;
-  }
-
-  private void populateContainerLocalityMappings() {
-    Map<String, Map<String, String>> allMappings = localityManager.readContainerLocality();
-    for (String containerId: containers.keySet()) {
-      if (allMappings.containsKey(containerId)) {
-        localityMappings.put(containerId, allMappings.get(containerId).get(SetContainerHostMapping.HOST_KEY));
-      } else {
-        localityMappings.put(containerId, null);
-      }
-    }
-  }
-
-  public Map<String, String> getAllContainerLocality() {
-    if (localityManager != null) {
-      populateContainerLocalityMappings();
-    }
-    return localityMappings;
-  }
-
-  public Map<String, ContainerModel> getContainers() {
-    return containers;
-  }
-
-  @Override
-  public String toString() {
-    return "JobModel [config=" + config + ", containers=" + containers + "]";
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((config == null) ? 0 : config.hashCode());
-    result = prime * result + ((containers == null) ? 0 : containers.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    JobModel other = (JobModel) obj;
-    if (config == null) {
-      if (other.config != null)
-        return false;
-    } else if (!config.equals(other.config))
-      return false;
-    if (containers == null) {
-      if (other.containers != null)
-        return false;
-    } else if (!containers.equals(other.containers))
-      return false;
-    return true;
-  }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index ed0c875..47c1754 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -405,7 +405,7 @@
     }
 
     return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
-        this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
+        this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config, jobModel),
         Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
         Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
         Option.apply(this.externalContextOptional.orElse(null)), null, startpointManager,
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 4470ae7..a5148fb 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -128,7 +128,7 @@
           containerId, jobModel,
           ScalaJavaUtil.toScalaMap(metricsReporters),
           taskFactory,
-          JobContextImpl.fromConfigWithDefaults(config),
+          JobContextImpl.fromConfigWithDefaults(config, jobModel),
           Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
           Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
           Option.apply(externalContextOptional.orElse(null)),
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonLocalityModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonLocalityModelMixIn.java
new file mode 100644
index 0000000..79b1367
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonLocalityModelMixIn.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers.model;
+
+import java.util.Map;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * A mix-in Jackson class to convert {@link org.apache.samza.job.model.LocalityModel} to/from JSON
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class JsonLocalityModelMixIn {
+  @JsonCreator
+  public JsonLocalityModelMixIn(@JsonProperty("processor-localities") Map<String, ProcessorLocality> processorLocalities) {
+
+  }
+
+  @JsonProperty("processor-localities")
+  abstract Map<String, ProcessorLocality> processorLocalities();
+}
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonProcessorLocalityMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonProcessorLocalityMixIn.java
new file mode 100644
index 0000000..cce573e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonProcessorLocalityMixIn.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers.model;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * A mix-in Jackson class to convert {@link org.apache.samza.job.model.ProcessorLocality} to/from JSON
+ * <b>NOTE:</b> In YARN deployment model, the id refers to the logical container id.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class JsonProcessorLocalityMixIn {
+  @JsonCreator
+  public JsonProcessorLocalityMixIn(@JsonProperty("id") String id, @JsonProperty("host") String host,
+      @JsonProperty("jmx-url") String jmxUrl, @JsonProperty("jmx-tunneling-url") String jmxTunnelingUrl) {
+  }
+
+  @JsonProperty("id")
+  abstract String id();
+
+  @JsonProperty("host")
+  abstract String host();
+
+  @JsonProperty("jmx-url")
+  abstract String jmxUrl();
+
+  @JsonProperty("jmx-tunneling-url")
+  abstract String jmxTunnelingUrl();
+}
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 694987f..db147f0 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -28,7 +28,9 @@
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.system.SystemStream;
@@ -130,6 +132,11 @@
       }
     });
 
+    mapper.getSerializationConfig().addMixInAnnotations(LocalityModel.class, JsonLocalityModelMixIn.class);
+    mapper.getDeserializationConfig().addMixInAnnotations(LocalityModel.class, JsonLocalityModelMixIn.class);
+    mapper.getSerializationConfig().addMixInAnnotations(ProcessorLocality.class, JsonProcessorLocalityMixIn.class);
+    mapper.getDeserializationConfig().addMixInAnnotations(ProcessorLocality.class, JsonProcessorLocalityMixIn.class);
+
     // Convert camel case to hyphenated field names, and register the module.
     mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
     mapper.registerModule(module);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index f352bd0..9d1896e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -77,6 +77,7 @@
   private final Map<String, ContainerStorageManager> containerStorageManagers = new HashMap<>();
 
   private int maxPartitionNumber = 0;
+  private JobModel jobModel;
   private Map<String, ContainerModel> containers = new HashMap<>();
 
   /**
@@ -145,6 +146,7 @@
           JobModelManager.apply(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(),
               coordinatorStreamStore, metricsRegistryMap);
       JobModel jobModel = jobModelManager.jobModel();
+      this.jobModel = jobModel;
       containers = jobModel.getContainers();
     } finally {
       coordinatorStreamStore.close();
@@ -249,7 +251,7 @@
               jobConfig,
               new HashMap<>(),
               new SamzaContainerMetrics(containerModel.getId(), new MetricsRegistryMap(), ""),
-              JobContextImpl.fromConfigWithDefaults(jobConfig),
+              JobContextImpl.fromConfigWithDefaults(jobConfig, jobModel),
               containerContext,
               new HashMap<>(),
               storeBaseDir,
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 37162c4..c7e7c7c 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -35,8 +35,7 @@
 import org.apache.samza.container.LocalityManager
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.coordinator.server.{HttpServer, JobServlet, LocalityServlet}
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
@@ -95,14 +94,15 @@
       val grouperMetadata: GrouperMetadata = getGrouperMetadata(config, localityManager, taskAssignmentManager, taskPartitionAssignmentManager)
 
       val jobModel: JobModel = readJobModel(config, changelogPartitionMapping, streamMetadataCache, grouperMetadata)
-      jobModelRef.set(new JobModel(jobModel.getConfig, jobModel.getContainers, localityManager))
+      jobModelRef.set(new JobModel(jobModel.getConfig, jobModel.getContainers))
 
       updateTaskAssignments(jobModel, taskAssignmentManager, taskPartitionAssignmentManager, grouperMetadata)
 
       val server = new HttpServer
       server.addServlet("/", new JobServlet(jobModelRef))
+      server.addServlet("/locality", new LocalityServlet(localityManager))
 
-      currentJobModelManager = new JobModelManager(jobModelRef.get(), server, localityManager)
+      currentJobModelManager = new JobModelManager(jobModelRef.get(), server)
       currentJobModelManager
     } finally {
       systemAdmins.stop()
@@ -167,15 +167,18 @@
     */
   def getProcessorLocality(config: Config, localityManager: LocalityManager) = {
     val containerToLocationId: util.Map[String, LocationId] = new util.HashMap[String, LocationId]()
-    val existingContainerLocality = localityManager.readContainerLocality()
+    val existingContainerLocality = localityManager.readLocality().getProcessorLocalities
 
     for (containerId <- 0 until new JobConfig(config).getContainerCount) {
-      val localityMapping = existingContainerLocality.get(containerId.toString)
+      val preferredHost = Option.apply(existingContainerLocality.get(containerId.toString))
+        .map(containerLocality => containerLocality.host())
+        .filter(host => host.nonEmpty)
+        .orNull
       // To handle the case when the container count is increased between two different runs of a samza-yarn job,
       // set the locality of newly added containers to any_host.
       var locationId: LocationId = new LocationId("ANY_HOST")
-      if (localityMapping != null && localityMapping.containsKey(SetContainerHostMapping.HOST_KEY)) {
-        locationId = new LocationId(localityMapping.get(SetContainerHostMapping.HOST_KEY))
+      if (preferredHost != null) {
+        locationId = new LocationId(preferredHost)
       }
       containerToLocationId.put(containerId.toString, locationId)
     }
@@ -366,6 +369,7 @@
 
     // processor list is required by some of the groupers. So, let's pass them as part of the config.
     // Copy the config and add the processor list to the config copy.
+    // TODO: It is non-ideal to have config as a medium to transmit the locality information; especially, if the locality information evolves. Evaluate options on using context objects to pass dependent components.
     val configMap = new util.HashMap[String, String](config)
     configMap.put(JobConfig.PROCESSOR_LIST, String.join(",", grouperMetadata.getProcessorLocality.keySet()))
     val grouper = getSystemStreamPartitionGrouper(new MapConfig(configMap))
@@ -444,12 +448,7 @@
   /**
    * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
    */
-  val server: HttpServer = null,
-
-  /**
-   * LocalityManager employed to read and write container and task locality information to metadata store.
-   */
-  val localityManager: LocalityManager = null) extends Logging {
+  val server: HttpServer = null) extends Logging {
 
   debug("Got job model: %s." format jobModel)
 
@@ -466,11 +465,6 @@
       debug("Stopping HTTP server.")
       server.stop
       info("Stopped HTTP server.")
-      if (localityManager != null) {
-        info("Stopping localityManager")
-        localityManager.close()
-        info("Stopped localityManager")
-      }
     }
   }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 79bd181..9b5a073 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -149,7 +149,7 @@
         jobModel,
         Map[String, MetricsReporter](),
         taskFactory,
-        JobContextImpl.fromConfigWithDefaults(config),
+        JobContextImpl.fromConfigWithDefaults(config, jobModel),
         Option(appDesc.getApplicationContainerContextFactory.orElse(null)),
         Option(appDesc.getApplicationTaskContextFactory.orElse(null)),
         buildExternalContext(config)
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index d5819eb..2b4a4b0 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -36,7 +36,8 @@
 import org.apache.samza.coordinator.JobModelManagerTestUtil;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -58,7 +59,7 @@
 public class TestContainerAllocatorWithHostAffinity {
 
   private final Config config = getConfig();
-  private final JobModelManager jobModelManager = initializeJobModelManager(config, 1);
+  private final JobModelManager jobModelManager = initializeJobModelManager(getConfig(), 1);
   private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
   private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
 
@@ -67,14 +68,7 @@
   private ContainerManager containerManager;
 
   private JobModelManager initializeJobModelManager(Config config, int containerCount) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    localityMap.put("0", new HashMap<String, String>() { {
-        put(SetContainerHostMapping.HOST_KEY, "abc");
-      } });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-
-    return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), containerCount, mockLocalityManager,
+    return JobModelManagerTestUtil.getJobModelManager(config, containerCount,
         new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)));
   }
 
@@ -87,12 +81,15 @@
 
   @Before
   public void setup() throws Exception {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "abc"))));
     CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config);
     CoordinatorStreamStore coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
     coordinatorStreamStore.init();
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
-    containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false);
+    containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
     containerAllocator =
         new ContainerAllocator(clusterResourceManager, config, state, true, containerManager);
     requestState = new MockContainerRequestState(clusterResourceManager, true);
@@ -372,7 +369,7 @@
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
     ClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false);
+        new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class));
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
       SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
@@ -419,7 +416,7 @@
   public void testExpiredRequestAllocationOnAnyHost() throws Exception {
     MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false, mock(LocalityManager.class)));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyManager, config, state, true, spyContainerManager));
     // Request Preferred Resources
@@ -463,7 +460,7 @@
     // Add Extra Resources
     MockClusterResourceManager spyClusterResourceManager = spy(new MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false, mock(LocalityManager.class)));
 
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyClusterResourceManager, config, state, true, spyContainerManager));
@@ -516,7 +513,7 @@
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
     MockClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class)));
 
     SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, "host-0", "id0",
         System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index f9104bd..b808296 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -26,10 +26,12 @@
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.JobModelManagerTestUtil;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -72,13 +74,15 @@
 
   @Before
   public void setup() throws Exception {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
     CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config);
     CoordinatorStreamStore coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
     coordinatorStreamStore.init();
     containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
     containerAllocator = new ContainerAllocator(manager, config, state, false,
-        new ContainerManager(containerPlacementMetadataStore, state, manager, false, false));
+        new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager));
     requestState = new MockContainerRequestState(manager, false);
     Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -276,7 +280,7 @@
     ClusterResourceManager.Callback mockCPM = mock(ClusterResourceManager.Callback.class);
     ClusterResourceManager mockManager = new MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, mockManager, false, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, mockManager, false, false, mock(LocalityManager.class)));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(mockManager, config, state, false, spyContainerManager));
     // Mock the callback from ClusterManager to add resources to the allocator
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 0ec635d..53bd5b0 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -45,8 +45,8 @@
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -103,6 +103,7 @@
   private ContainerManager containerManager;
   private MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity;
   private ContainerProcessManager cpm;
+  private LocalityManager localityManager;
   private ClusterResourceManager.Callback callback;
 
   private Config getConfig() {
@@ -122,30 +123,8 @@
     return new MapConfig(map);
   }
 
-  private JobModelManager getJobModelManagerWithHostAffinity(Map<String, String> containerIdToHost) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    containerIdToHost.forEach((containerId, host) -> {
-      localityMap.put(containerId,
-          ImmutableMap.of(SetContainerHostMapping.HOST_KEY, containerIdToHost.get(containerId)));
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-
-    return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), containerIdToHost.size(),
-        mockLocalityManager, this.server);
-  }
-
-  private JobModelManager getJobModelManagerWithHostAffinityWithStandby(Map<String, String> containerIdToHost) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    containerIdToHost.forEach((containerId, host) -> {
-      localityMap.put(containerId,
-          ImmutableMap.of(SetContainerHostMapping.HOST_KEY, containerIdToHost.get(containerId)));
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-    // Generate JobModel for standby containers
-    JobModel standbyJobModel = TestStandbyAllocator.getJobModelWithStandby(2, 2, 2, Optional.of(mockLocalityManager));
-    return new JobModelManager(standbyJobModel, server, null);
+  private JobModelManager getJobModelManagerWithStandby() {
+    return new JobModelManager(TestStandbyAllocator.getJobModelWithStandby(2, 2, 2), server);
   }
 
   @Before
@@ -159,14 +138,19 @@
     containerPlacementMetadataStore.start();
     // Utils Related to Cluster manager:
     config = new MapConfig(configVals, getConfigWithHostAffinityAndRetries(true, 1, true));
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+    state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false));
+    localityManager = mock(LocalityManager.class);
+    when(localityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of(
+            "0", new ProcessorLocality("0", "host-1"),
+            "1", new ProcessorLocality("1", "host-2"))));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
-            clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager);
+            clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager);
   }
 
   @After
@@ -177,15 +161,22 @@
   }
 
   public void setupStandby() throws Exception {
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinityWithStandby(ImmutableMap.of("0", "host-1", "1", "host-2", "0-0", "host-2", "1-0", "host-1")));
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of(
+            "0", new ProcessorLocality("0", "host-1"),
+            "1", new ProcessorLocality("1", "host-2"),
+            "0-0", new ProcessorLocality("0", "host-2"),
+            "1-0", new ProcessorLocality("0", "host-1"))));
+    state = new SamzaApplicationState(getJobModelManagerWithStandby());
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     // Enable standby
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, true));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, true, mockLocalityManager));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
-        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager);
+        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, mockLocalityManager);
   }
 
   @Test(timeout = 10000)
@@ -558,14 +549,14 @@
   public void testContainerPlacementsForJobRunningInDegradedState() throws Exception {
     // Set failure after retries to false to enable job running in degraded state
     config = new MapConfig(configVals, getConfigWithHostAffinityAndRetries(true, 1, false));
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+    state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false));
+    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
     allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(),
-        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager);
+        clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager);
 
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
@@ -672,18 +663,18 @@
     Map<String, String> conf = new HashMap<>();
     conf.putAll(getConfigWithHostAffinityAndRetries(false, 1, true));
     SamzaApplicationState state =
-        new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+        new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, false, false);
+        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, false, false, localityManager);
     MockContainerAllocatorWithoutHostAffinity allocatorWithoutHostAffinity =
         new MockContainerAllocatorWithoutHostAffinity(clusterResourceManager, new MapConfig(conf), state,
             containerManager);
 
     ContainerProcessManager cpm = new ContainerProcessManager(
         new ClusterManagerConfig(new MapConfig(getConfig(), getConfigWithHostAffinityAndRetries(false, 1, true))), state,
-        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithoutHostAffinity), containerManager);
+        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithoutHostAffinity), containerManager, localityManager);
 
     // Mimic Cluster Manager returning any request
     doAnswer(new Answer<Void>() {
@@ -807,16 +798,16 @@
   @Test(expected = NullPointerException.class)
   public void testBadControlRequestRejected() throws Exception {
     SamzaApplicationState state =
-        new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+        new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ContainerManager containerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, localityManager));
     MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
         new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager);
     ContainerProcessManager cpm = new ContainerProcessManager(
         new ClusterManagerConfig(new MapConfig(getConfig(), getConfigWithHostAffinityAndRetries(true, 1, true))), state,
-        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager);
+        new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager, localityManager);
 
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index a5dbe77..5e550cf 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -38,7 +38,8 @@
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -108,20 +109,7 @@
 
   private HttpServer server = null;
 
-
-  private JobModelManager getJobModelManagerWithHostAffinity(Map<String, String> containerIdToHost) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    containerIdToHost.forEach((containerId, host) -> {
-      localityMap.put(containerId, ImmutableMap.of(SetContainerHostMapping.HOST_KEY, containerIdToHost.get(containerId)));
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-
-    return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(),
-        containerIdToHost.size(), mockLocalityManager, this.server);
-  }
-
-  private JobModelManager getJobModelManagerWithoutHostAffinity(int containerCount) {
+  private JobModelManager getJobModelManager(int containerCount) {
     return JobModelManagerTestUtil.getJobModelManager(getConfig(), containerCount, this.server);
   }
 
@@ -149,11 +137,14 @@
     conf.put("cluster-manager.container.memory.mb", "500");
     conf.put("cluster-manager.container.cpu.cores", "5");
 
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"))));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false);
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
     ContainerProcessManager cpm =
         buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());
 
@@ -169,7 +160,7 @@
     conf.put("cluster-manager.container.memory.mb", "500");
     conf.put("cluster-manager.container.cpu.cores", "5");
 
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1")));
+    state = new SamzaApplicationState(getJobModelManager(1));
     callback = new MockClusterResourceManagerCallback();
     clusterResourceManager = new MockClusterResourceManager(callback, state);
     cpm = new ContainerProcessManager(
@@ -178,7 +169,8 @@
         new MetricsRegistryMap(),
         clusterResourceManager,
         Optional.empty(),
-        containerManager
+        containerManager,
+        mockLocalityManager
     );
 
     allocator =
@@ -192,12 +184,12 @@
   @Test
   public void testOnInit() throws Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     ClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     ContainerProcessManager cpm =
@@ -237,7 +229,7 @@
   @Test
   public void testOnShutdown() throws Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
@@ -260,12 +252,12 @@
   @Test
   public void testCpmShouldStopWhenContainersFinish() throws Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -308,12 +300,12 @@
   @Test
   public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -408,11 +400,11 @@
     int maxRetries = 3;
     String processorId = "0";
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, maxRetries, failAfterRetries));
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -494,12 +486,22 @@
     int maxRetries = 3;
     String processorId = "0";
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, maxRetries, failAfterRetries));
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+
+    if (withHostAffinity) {
+      when(mockLocalityManager.readLocality())
+          .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"))));
+    } else {
+      when(mockLocalityManager.readLocality())
+          .thenReturn(new LocalityModel(new HashMap<>()));
+    }
+
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-            clusterManagerConfig.getHostAffinityEnabled(), false);
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false, mockLocalityManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -508,7 +510,7 @@
         containerManager);
 
     ContainerProcessManager cpm =
-        buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));
+        buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator), mockLocalityManager);
 
     // start triggers a request
     cpm.start();
@@ -603,12 +605,12 @@
   public void testInvalidNotificationsAreIgnored() throws Exception {
     Config conf = getConfig();
 
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -644,13 +646,16 @@
 
   @Test
   public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("1", "host1")));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     Map<String, String> configMap = new HashMap<>();
     configMap.putAll(getConfig());
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("1", "host1"))));
+    ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -660,7 +665,7 @@
 
     ContainerProcessManager manager =
         new ContainerProcessManager(new ClusterManagerConfig(config), state, new MetricsRegistryMap(), clusterResourceManager,
-            Optional.of(allocator), containerManager);
+            Optional.of(allocator), containerManager, mockLocalityManager);
 
     manager.start();
     SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1");
@@ -680,12 +685,14 @@
     config.put("cluster-manager.container.request.timeout.ms", "10000");
     Config cfg = new MapConfig(config);
     // 1. Request two containers on hosts - host1 and host2
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1",
-        "1", "host2")));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(2));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"), "1", new ProcessorLocality("1", "host2"))));
+    ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, mockLocalityManager);
 
     MockContainerAllocatorWithHostAffinity allocator = new MockContainerAllocatorWithHostAffinity(
         clusterResourceManager,
@@ -694,7 +701,7 @@
         containerManager);
 
     ContainerProcessManager cpm =
-        spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator)));
+        spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator), mockLocalityManager));
 
     cpm.start();
     assertFalse(cpm.shouldShutdown());
@@ -744,12 +751,12 @@
 
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfig());
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(conf)));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -819,12 +826,12 @@
 
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfig());
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(config)));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
@@ -915,10 +922,32 @@
     server.stop();
   }
 
+  private ContainerManager buildContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
+      SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
+      boolean hostAffinityEnabled, boolean standByEnabled) {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    return buildContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager,
+        hostAffinityEnabled, standByEnabled, mockLocalityManager);
+  }
+
+  private ContainerManager buildContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
+      SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
+      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
+    return new ContainerManager(containerPlacementMetadataStore, samzaApplicationState, clusterResourceManager,
+        hostAffinityEnabled, standByEnabled, localityManager);
+  }
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator) {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    return buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, allocator, mockLocalityManager);
+  }
+
+  private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
+      ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator, LocalityManager localityManager) {
     return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager,
-        allocator, new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        clusterManagerConfig.getHostAffinityEnabled(), false));
+        allocator, buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+        clusterManagerConfig.getHostAffinityEnabled(), false, localityManager), localityManager);
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
index c5f3ec1..cb3a7a7 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
@@ -22,10 +22,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import org.apache.samza.Partition;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -40,7 +38,7 @@
 
   @Test
   public void testWithNoStandby() {
-    JobModel jobModel = getJobModelWithStandby(1, 1, 1, Optional.empty());
+    JobModel jobModel = getJobModelWithStandby(1, 1, 1);
     List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints("0", jobModel);
     Assert.assertEquals("Constrained container count should be 0", 0, containerConstraints.size());
   }
@@ -59,7 +57,7 @@
 
 
   public void testWithStandby(int nContainers, int nTasks, int replicationFactor) {
-    JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, replicationFactor, Optional.empty());
+    JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, replicationFactor);
 
     for (String containerID : jobModel.getContainers().keySet()) {
       List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints(containerID, jobModel);
@@ -81,7 +79,7 @@
   }
 
   // Helper method to create a jobmodel with given number of containers, tasks and replication factor
-  public static JobModel getJobModelWithStandby(int nContainers, int nTasks, int replicationFactor, Optional<LocalityManager> localityManager) {
+  public static JobModel getJobModelWithStandby(int nContainers, int nTasks, int replicationFactor) {
     Map<String, ContainerModel> containerModels = new HashMap<>();
     int taskID = 0;
 
@@ -104,7 +102,7 @@
     }
 
     containerModels.putAll(standbyContainerModels);
-    return new JobModel(new MapConfig(), containerModels, localityManager.orElse(null));
+    return new JobModel(new MapConfig(), containerModels);
   }
 
   // Helper method that creates a taskmodel with one input ssp
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
index d18ad67..f2819c7 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
@@ -34,7 +34,10 @@
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 
 public class TestLocalityManager {
 
@@ -59,7 +62,7 @@
     LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE));
 
     localityManager.writeContainerToHostMapping("0", "localhost");
-    Map<String, Map<String, String>> localMap = localityManager.readContainerLocality();
+    Map<String, Map<String, String>> localMap = readContainerLocality(localityManager);
     Map<String, Map<String, String>> expectedMap =
       new HashMap<String, Map<String, String>>() {
         {
@@ -87,9 +90,9 @@
 
     localityManager.writeContainerToHostMapping("1", "localhost");
 
-    assertEquals(localityManager.readContainerLocality().size(), 1);
+    assertEquals(readContainerLocality(localityManager).size(), 1);
 
-    assertEquals(ImmutableMap.of("1", ImmutableMap.of("host", "localhost")), localityManager.readContainerLocality());
+    assertEquals(ImmutableMap.of("1", ImmutableMap.of("host", "localhost")), readContainerLocality(localityManager));
 
     localityManager.close();
 
@@ -98,4 +101,13 @@
     assertTrue(producer.isStopped());
     assertTrue(consumer.isStopped());
   }
+
+  static Map<String, Map<String, String>> readContainerLocality(LocalityManager localityManager) {
+    Map<String, Map<String, String>> containerLocalityMap = new HashMap<>();
+    localityManager.readLocality().getProcessorLocalities().forEach((containerId, containerLocality) -> {
+      containerLocalityMap.put(containerId, ImmutableMap.of("host", containerLocality.host()));
+    });
+
+    return containerLocalityMap;
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
index 6e29d81..3affa71 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
@@ -22,13 +22,9 @@
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.runtime.LocationId;
-import org.apache.samza.system.StreamMetadataCache;
 
 /**
  * Utils to create instances of {@link JobModelManager} in unit tests
@@ -36,23 +32,12 @@
 public class JobModelManagerTestUtil {
 
   public static JobModelManager getJobModelManager(Config config, int containerCount, HttpServer server) {
-    return getJobModelManagerWithLocalityManager(config, containerCount, null, server);
-  }
-
-  public static JobModelManager getJobModelManagerWithLocalityManager(Config config, int containerCount, LocalityManager localityManager, HttpServer server) {
     Map<String, ContainerModel> containers = new java.util.HashMap<>();
     for (int i = 0; i < containerCount; i++) {
       ContainerModel container = new ContainerModel(String.valueOf(i), new HashMap<>());
       containers.put(String.valueOf(i), container);
     }
-    JobModel jobModel = new JobModel(config, containers, localityManager);
-    return new JobModelManager(jobModel, server, null);
-  }
-
-  public static JobModelManager getJobModelManagerUsingReadModel(Config config, StreamMetadataCache streamMetadataCache,
-      HttpServer server, LocalityManager localityManager, Map<String, LocationId> processorLocality) {
-    JobModel jobModel = JobModelManager.readJobModel(config, new HashMap<>(), streamMetadataCache,
-        new GrouperMetadataImpl(processorLocality, new HashMap<>(), new HashMap<>(), new HashMap<>()));
-    return new JobModelManager(new JobModel(jobModel.getConfig(), jobModel.getContainers(), localityManager), server, localityManager);
+    JobModel jobModel = new JobModel(config, containers);
+    return new JobModelManager(jobModel, server);
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index 9908da5..83de0cf 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -39,9 +39,10 @@
 import org.apache.samza.container.grouper.task.TaskAssignmentManager;
 import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
 import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.runtime.LocationId;
@@ -57,10 +58,12 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentMatcher;
@@ -104,72 +107,6 @@
   }
 
   @Test
-  public void testLocalityMapWithHostAffinity() {
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        put("cluster-manager.container.count", "1");
-        put("cluster-manager.container.memory.mb", "512");
-        put("cluster-manager.container.retry.count", "1");
-        put("cluster-manager.container.retry.window.ms", "1999999999");
-        put("cluster-manager.allocator.sleep.ms", "10");
-        put("yarn.package.path", "/foo");
-        put("task.inputs", "test-system.test-stream");
-        put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
-        put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
-        put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-        put("job.host-affinity.enabled", "true");
-      }
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-
-    localityMappings.put("0", new HashMap<String, String>() { {
-        put(SetContainerHostMapping.HOST_KEY, "abc-affinity");
-      } });
-    when(mockLocalityManager.readContainerLocality()).thenReturn(this.localityMappings);
-
-    Map<String, LocationId> containerLocality = ImmutableMap.of("0", new LocationId("abc-affinity"));
-    this.jobModelManager =
-        JobModelManagerTestUtil.getJobModelManagerUsingReadModel(config, mockStreamMetadataCache, server,
-            mockLocalityManager, containerLocality);
-
-    assertEquals(jobModelManager.jobModel().getAllContainerLocality(), ImmutableMap.of("0", "abc-affinity"));
-  }
-
-  @Test
-  public void testLocalityMapWithoutHostAffinity() {
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        put("cluster-manager.container.count", "1");
-        put("cluster-manager.container.memory.mb", "512");
-        put("cluster-manager.container.retry.count", "1");
-        put("cluster-manager.container.retry.window.ms", "1999999999");
-        put("cluster-manager.allocator.sleep.ms", "10");
-        put("yarn.package.path", "/foo");
-        put("task.inputs", "test-system.test-stream");
-        put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
-        put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
-        put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
-        put("job.host-affinity.enabled", "false");
-      }
-    });
-
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-
-    localityMappings.put("0", new HashMap<String, String>() { {
-        put(SetContainerHostMapping.HOST_KEY, "abc-affinity");
-      } });
-    when(mockLocalityManager.readContainerLocality()).thenReturn(new HashMap<>());
-
-    Map<String, LocationId> containerLocality = ImmutableMap.of("0", new LocationId("abc-affinity"));
-
-    this.jobModelManager =
-        JobModelManagerTestUtil.getJobModelManagerUsingReadModel(config, mockStreamMetadataCache, server,
-            mockLocalityManager, containerLocality);
-
-    assertEquals(jobModelManager.jobModel().getAllContainerLocality(), Collections.singletonMap("0", null));
-  }
-
-  @Test
   public void testGetGrouperMetadata() {
     // Mocking setup.
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
@@ -179,17 +116,13 @@
     SystemStreamPartition testSystemStreamPartition1 = new SystemStreamPartition(new SystemStream("test-system-0", "test-stream-0"), new Partition(1));
     SystemStreamPartition testSystemStreamPartition2 = new SystemStreamPartition(new SystemStream("test-system-1", "test-stream-1"), new Partition(2));
 
-    Map<String, Map<String, String>> localityMappings = new HashMap<>();
-    localityMappings.put("0", ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "abc-affinity"));
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "abc-affinity"))));
 
     Map<SystemStreamPartition, List<String>> taskToSSPAssignments = ImmutableMap.of(testSystemStreamPartition1, ImmutableList.of("task-0", "task-1"),
                                                                                     testSystemStreamPartition2, ImmutableList.of("task-2", "task-3"));
 
     Map<String, String> taskAssignment = ImmutableMap.of("task-0", "0");
 
-    // Mock the container locality assignment.
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMappings);
-
     // Mock the task to partition assignment.
     when(mockTaskPartitionAssignmentManager.readTaskPartitionAssignments()).thenReturn(taskToSSPAssignments);
 
@@ -199,8 +132,8 @@
 
     GrouperMetadataImpl grouperMetadata = JobModelManager.getGrouperMetadata(new MapConfig(), mockLocalityManager, mockTaskAssignmentManager, mockTaskPartitionAssignmentManager);
 
-    Mockito.verify(mockLocalityManager).readContainerLocality();
-    Mockito.verify(mockTaskAssignmentManager).readTaskAssignment();
+    verify(mockLocalityManager).readLocality();
+    verify(mockTaskAssignmentManager).readTaskAssignment();
 
     Assert.assertEquals(ImmutableMap.of("0", new LocationId("abc-affinity")), grouperMetadata.getProcessorLocality());
     Assert.assertEquals(ImmutableMap.of(new TaskName("task-0"), new LocationId("abc-affinity")), grouperMetadata.getTaskLocality());
@@ -216,15 +149,14 @@
   public void testGetProcessorLocalityAllEntriesExisting() {
     Config config = new MapConfig(ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, "2"));
 
-    Map<String, Map<String, String>> localityMappings = new HashMap<>();
-    localityMappings.put("0", ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "0-affinity"));
-    localityMappings.put("1", ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "1-affinity"));
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMappings);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(ImmutableMap.of(
+        "0", new ProcessorLocality("0", "0-affinity"),
+        "1", new ProcessorLocality("1", "1-affinity"))));
 
     Map<String, LocationId> processorLocality = JobModelManager.getProcessorLocality(config, mockLocalityManager);
 
-    Mockito.verify(mockLocalityManager).readContainerLocality();
+    verify(mockLocalityManager).readLocality();
     ImmutableMap<String, LocationId> expected =
         ImmutableMap.of("0", new LocationId("0-affinity"), "1", new LocationId("1-affinity"));
     Assert.assertEquals(expected, processorLocality);
@@ -234,15 +166,13 @@
   public void testGetProcessorLocalityNewContainer() {
     Config config = new MapConfig(ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, "2"));
 
-    Map<String, Map<String, String>> localityMappings = new HashMap<>();
-    // 2 containers, but only return 1 existing mapping
-    localityMappings.put("0", ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "abc-affinity"));
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMappings);
+    // 2 containers, but only return 1 existing mapping
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "abc-affinity"))));
 
     Map<String, LocationId> processorLocality = JobModelManager.getProcessorLocality(config, mockLocalityManager);
 
-    Mockito.verify(mockLocalityManager).readContainerLocality();
+    verify(mockLocalityManager).readLocality();
     ImmutableMap<String, LocationId> expected = ImmutableMap.of(
         // found entry in existing locality
         "0", new LocationId("abc-affinity"),
@@ -291,16 +221,16 @@
     systemStreamPartitions.add(new SystemStreamPartition(new SystemStream("test-system-3", "test-stream-3"), new Partition(2)));
 
     // Verifications
-    Mockito.verify(mockJobModel, atLeast(1)).getContainers();
-    Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
-    Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMappings(ImmutableMap.of("test-container-id",
+    verify(mockJobModel, atLeast(1)).getContainers();
+    verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
+    verify(mockTaskAssignmentManager).writeTaskContainerMappings(ImmutableMap.of("test-container-id",
         ImmutableMap.of("task-1", TaskMode.Active, "task-2", TaskMode.Active, "task-3", TaskMode.Active, "task-4", TaskMode.Active)));
 
     // Verify that the old, stale partition mappings had been purged in the coordinator stream.
-    Mockito.verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions);
+    verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions);
 
     // Verify that the new task to partition assignment is stored in the coordinator stream.
-    Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(ImmutableMap.of(
+    verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(ImmutableMap.of(
         testSystemStreamPartition1, ImmutableList.of("task-1"),
         testSystemStreamPartition2, ImmutableList.of("task-2"),
         testSystemStreamPartition3, ImmutableList.of("task-3"),
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index ea57479..104b0ba 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -442,7 +442,7 @@
     cms.put(cm0.getId(), cm0);
     cms.put(cm1.getId(), cm1);
 
-    JobModel jobModel = new JobModel(config, cms, null);
+    JobModel jobModel = new JobModel(config, cms);
     Multimap<SystemStream, String> streamToTasks = OperatorImplGraph.getStreamToConsumerTasks(jobModel);
     assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2);
     assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1);
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
index b86da1f..c13da8d 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -26,6 +26,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -38,9 +39,9 @@
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.rest.model.Task;
 import org.apache.samza.rest.proxy.installation.InstallationFinder;
@@ -137,7 +138,7 @@
   protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
     CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(consumer.getConfig(), new MetricsRegistryMap());
     LocalityManager localityManager = new LocalityManager(coordinatorStreamStore);
-    Map<String, Map<String, String>> containerIdToHostMapping = localityManager.readContainerLocality();
+    Map<String, ProcessorLocality> containerLocalities = localityManager.readLocality().getProcessorLocalities();
     TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskModeMapping.TYPE));
     Map<String, String> taskNameToContainerIdMapping = taskAssignmentManager.readTaskAssignment();
     StorageConfig storageConfig = new StorageConfig(consumer.getConfig());
@@ -145,7 +146,9 @@
     return taskNameToContainerIdMapping.entrySet()
         .stream()
         .map(entry -> {
-          String hostName = containerIdToHostMapping.get(entry.getValue()).get(SetContainerHostMapping.HOST_KEY);
+          String hostName = Optional.ofNullable(containerLocalities.get(entry.getValue()))
+              .map(ProcessorLocality::host)
+              .orElse(null);
           return new Task(hostName, entry.getKey(), entry.getValue(), new ArrayList<>(), storeNames);
         }).collect(Collectors.toList());
   }
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 46f345d..3f7056e 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -142,7 +142,7 @@
           new TaskInstanceCollector(producerMultiplexer),
           new MetricsRegistryMap,
           null,
-          JobContextImpl.fromConfigWithDefaults(storageConfig),
+          JobContextImpl.fromConfigWithDefaults(storageConfig, null),
           new ContainerContextImpl(new ContainerModel("0", tasks.asJava), new MetricsRegistryMap), StoreMode.ReadWrite
         )
 
diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index 2b31977..74fef67 100644
--- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -19,10 +19,10 @@
 
 package org.apache.samza.validation;
 
-import java.util.Map;
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -35,17 +35,18 @@
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.job.yarn.ClientHelper;
 import org.apache.samza.metrics.JmxMetricsAccessor;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsValidator;
-import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.util.CommandLine;
-import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.ReflectionUtil;
 import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.slf4j.Logger;
@@ -158,23 +159,24 @@
     CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, metricsRegistry);
     coordinatorStreamStore.init();
     try {
-      Config configFromCoordinatorStream = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
-      ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamStore);
-      JobModelManager jobModelManager =
-          JobModelManager.apply(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(),
-              coordinatorStreamStore, metricsRegistry);
+      LocalityManager localityManager =
+          new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE));
       validator.init(config);
-      Map<String, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
-      for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
-        String containerId = entry.getKey();
-        String jmxUrl = entry.getValue();
-        log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
-        JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
-        jmxMetrics.connect();
-        validator.validate(jmxMetrics);
-        jmxMetrics.close();
-        log.info("validate container " + containerId + " successfully");
+      LocalityModel localityModel = localityManager.readLocality();
+
+      for (ProcessorLocality processorLocality : localityModel.getProcessorLocalities().values()) {
+        String containerId = processorLocality.id();
+        String jmxUrl = processorLocality.jmxTunnelingUrl();
+        if (StringUtils.isNotBlank(jmxUrl)) {
+          log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
+          JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
+          jmxMetrics.connect();
+          validator.validate(jmxMetrics);
+          jmxMetrics.close();
+          log.info("validate container " + containerId + " successfully");
+        }
       }
+
       validator.complete();
     } finally {
       coordinatorStreamStore.close();
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index 7fe6305..d01b20f 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -137,8 +137,7 @@
               %td
                 Up time: #{container.upTimeStr()}
               %td
-                Ordinary: #{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(processorId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_URL_KEY)}
-                Tunneling: #{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(processorId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)}
+                %a(target="_blank" href="#{state.coordinatorUrl.toString}locality?processorId=#{processorId.toString}") JMX
 
       %h2 Failed Containers
       %table.table.table-striped.table-bordered.tablesorter#containers-table
diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestLocalityServlet.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestLocalityServlet.java
new file mode 100644
index 0000000..56d7ae1
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestLocalityServlet.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.webapp;
+
+import com.google.common.collect.ImmutableMap;
+import java.net.URL;
+import java.util.Collections;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.coordinator.server.LocalityServlet;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.HttpUtil;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * A test class for {@link LocalityServlet}. It validates the servlet directly and Serde Mix-In of {@link ProcessorLocality}
+ * indirectly.
+ */
+public class TestLocalityServlet {
+  private static final String PROCESSOR_ID1 = "1";
+  private static final String PROCESSOR_ID2 = "2";
+  private static final String HOST1 = "host1";
+  private static final String HOST2 = "host2";
+  private static final String JMX_URL = "jmx";
+  private static final String TUNNELING_URL = "tunneling";
+
+  private static final ProcessorLocality PROCESSOR_1_LOCALITY =
+      new ProcessorLocality(PROCESSOR_ID1, HOST1, JMX_URL, TUNNELING_URL);
+  private static final ProcessorLocality PROCESSOR_2_LOCALITY =
+      new ProcessorLocality("2", HOST2, JMX_URL, TUNNELING_URL);
+
+  private final ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  private HttpServer webApp;
+  private LocalityManager localityManager;
+
+
+
+  @Before
+  public void setup()
+      throws Exception {
+    localityManager = mock(LocalityManager.class);
+    when(localityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of(PROCESSOR_ID1, PROCESSOR_1_LOCALITY, PROCESSOR_ID2, PROCESSOR_2_LOCALITY)));
+    webApp = new HttpServer("/", 0, "", new ServletHolder(new DefaultServlet()));
+    webApp.addServlet("/locality", new LocalityServlet(localityManager));
+    webApp.start();
+  }
+
+  @After
+  public void cleanup()
+      throws Exception {
+    webApp.stop();
+  }
+
+  @Test
+  public void testReadContainerLocality() throws Exception {
+    URL url = new URL(webApp.getUrl().toString() + "locality");
+
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+    LocalityModel locality = mapper.readValue(response, LocalityModel.class);
+
+    assertEquals("Expected locality for two containers", 2, locality.getProcessorLocalities().size());
+    assertEquals("Mismatch in locality for processor " + PROCESSOR_ID1,
+        locality.getProcessorLocality(PROCESSOR_ID1), PROCESSOR_1_LOCALITY);
+    assertEquals("Mismatch in locality for processor " + PROCESSOR_ID2,
+        locality.getProcessorLocality(PROCESSOR_ID2), PROCESSOR_2_LOCALITY);
+  }
+
+  @Test
+  public void testReadContainerLocalityWithNoLocality() throws Exception {
+    final LocalityModel expectedLocality = new LocalityModel(Collections.emptyMap());
+    URL url = new URL(webApp.getUrl().toString() + "locality");
+    when(localityManager.readLocality()).thenReturn(new LocalityModel(ImmutableMap.of()));
+
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+    LocalityModel locality = mapper.readValue(response, LocalityModel.class);
+
+    assertEquals("Expected empty response but got " + locality, locality, expectedLocality);
+  }
+
+  @Test
+  public void testReadProcessorLocality() throws Exception {
+    URL url = new URL(webApp.getUrl().toString() + "locality?processorId=" + PROCESSOR_ID1);
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+
+    assertEquals("Mismatch in the locality for processor " + PROCESSOR_ID1,
+        mapper.readValue(response, ProcessorLocality.class), PROCESSOR_1_LOCALITY);
+  }
+
+  @Test
+  public void testReadProcessorLocalityWithNoLocality() throws Exception {
+    final ProcessorLocality expectedProcessorLocality = new ProcessorLocality(PROCESSOR_ID2, "");
+    URL url = new URL(webApp.getUrl().toString() + "locality?processorId=" + PROCESSOR_ID2);
+    when(localityManager.readLocality()).thenReturn(new LocalityModel(ImmutableMap.of()));
+
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+    ProcessorLocality processorLocality = mapper.readValue(response, ProcessorLocality.class);
+
+    assertEquals("Expected empty response for processor locality " + PROCESSOR_ID2 + " but got " + processorLocality,
+        processorLocality, expectedProcessorLocality);
+  }
+}