diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index f9438c3..d356baf 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -19,11 +19,14 @@
 package org.apache.samza.job.model;
 
 import com.google.common.base.Preconditions;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metadatastore.MetadataStore;
@@ -44,6 +47,22 @@
   private static final String JOB_MODEL_GENERATION_KEY = "jobModelGeneration/jobModels";
 
   /**
+   * A helper method to fetch the task names associated with the processor from the job model.
+   * @param processorId processor for which task names are fetched
+   * @param jobModel job model
+   * @return a set of {@code TaskName} associated with the processor from the job model.
+   */
+  public static Set<TaskName> getTaskNamesForProcessor(String processorId, JobModel jobModel) {
+    Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
+    Preconditions.checkArgument(StringUtils.isNotBlank(processorId), "ProcessorId cannot be empty or null");
+
+    return Optional.ofNullable(jobModel.getContainers().get(processorId))
+        .map(ContainerModel::getTasks)
+        .map(Map::keySet)
+        .orElse(Collections.emptySet());
+  }
+
+  /**
    * Extracts the map of {@link SystemStreamPartition}s to {@link TaskName} from the {@link JobModel}
    *
    * @return the extracted map
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 1489f67..6526705 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -472,10 +472,8 @@
           JobModel jobModel = getJobModel();
           // start the container with the new model
           if (coordinatorListener != null) {
-            for (ContainerModel containerModel : jobModel.getContainers().values()) {
-              for (TaskName taskName : containerModel.getTasks().keySet()) {
-                zkUtils.writeTaskLocality(taskName, locationId);
-              }
+            for (TaskName taskName : JobModelUtil.getTaskNamesForProcessor(processorId, jobModel)) {
+              zkUtils.writeTaskLocality(taskName, locationId);
             }
             coordinatorListener.onNewJobModel(processorId, jobModel);
           }
diff --git a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
index 960bdf9..0e8baae 100644
--- a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
@@ -30,9 +30,52 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 public class TestJobModelUtil {
 
+  @Test
+  public void testGetTaskNamesForProcessorAbsentInJobModel() {
+    JobModel mockJobModel = mock(JobModel.class);
+    when(mockJobModel.getContainers()).thenReturn(mock(Map.class));
+
+    Set<TaskName> taskNames = JobModelUtil.getTaskNamesForProcessor("testProcessor", mockJobModel);
+    assertTrue("TaskNames should be empty", taskNames.isEmpty());
+  }
+
+  @Test
+  public void testGetTaskNamesForProcessorPresentInJobModel() {
+    TaskName expectedTaskName = new TaskName("testTaskName");
+    String processorId = "testProcessor";
+    JobModel mockJobModel = mock(JobModel.class);
+    ContainerModel mockContainerModel = mock(ContainerModel.class);
+    Map<String, ContainerModel> mockContainers = mock(Map.class);
+
+    when(mockContainers.get(processorId)).thenReturn(mockContainerModel);
+    when(mockContainerModel.getTasks()).thenReturn(ImmutableMap.of(expectedTaskName, mock(TaskModel.class)));
+    when(mockJobModel.getContainers()).thenReturn(mockContainers);
+
+    Set<TaskName> actualTaskNames = JobModelUtil.getTaskNamesForProcessor(processorId, mockJobModel);
+    assertEquals("Expecting TaskNames size = 1", 1, actualTaskNames.size());
+    assertTrue("Expecting testTaskName to be returned", actualTaskNames.contains(expectedTaskName));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testGetTaskNamesForProcessorWithNullJobModel() {
+    JobModelUtil.getTaskNamesForProcessor("processor", null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetTaskNamesForProcessorWithEmptyProcessorId() {
+    JobModelUtil.getTaskNamesForProcessor("", mock(JobModel.class));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetTaskNamesForProcessorWithNullProcessorId() {
+    JobModelUtil.getTaskNamesForProcessor(null, mock(JobModel.class));
+  }
+
   @Test(expected = IllegalArgumentException.class)
   public void testTaskToSystemStreamPartitionsWithNullJobModel() {
     JobModelUtil.getTaskToSystemStreamPartitions(null);
