SAMZA-2632: Processors should only write their own task locality to zookeeper (#1473)

Problem:
Processors update the task locality for all the tasks in the job model with their own locality regardless of whether the tasks belong to their container model or not.
Description:
As part of SEP 11, host affinity for standalone was introduced. For this feature, we persist the task locality in zookeeper so that subsequent rebalances take this locality into account when generating job model. During job model consensus, we update the task locality for all the tasks and this results in incorrect locality depending on the order of writes.
Changes:
Only update the locality for the tasks that belong to the processor.
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index f9438c3..d356baf 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -19,11 +19,14 @@
 package org.apache.samza.job.model;
 
 import com.google.common.base.Preconditions;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metadatastore.MetadataStore;
@@ -44,6 +47,22 @@
   private static final String JOB_MODEL_GENERATION_KEY = "jobModelGeneration/jobModels";
 
   /**
+   * A helper method to fetch the task names associated with the processor from the job model.
+   * @param processorId processor for which task names are fetched
+   * @param jobModel job model
+   * @return a set of {@code TaskName} associated with the processor from the job model.
+   */
+  public static Set<TaskName> getTaskNamesForProcessor(String processorId, JobModel jobModel) {
+    Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
+    Preconditions.checkArgument(StringUtils.isNotBlank(processorId), "ProcessorId cannot be empty or null");
+
+    return Optional.ofNullable(jobModel.getContainers().get(processorId))
+        .map(ContainerModel::getTasks)
+        .map(Map::keySet)
+        .orElse(Collections.emptySet());
+  }
+
+  /**
    * Extracts the map of {@link SystemStreamPartition}s to {@link TaskName} from the {@link JobModel}
    *
    * @return the extracted map
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 1489f67..6526705 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -472,10 +472,8 @@
           JobModel jobModel = getJobModel();
           // start the container with the new model
           if (coordinatorListener != null) {
-            for (ContainerModel containerModel : jobModel.getContainers().values()) {
-              for (TaskName taskName : containerModel.getTasks().keySet()) {
-                zkUtils.writeTaskLocality(taskName, locationId);
-              }
+            for (TaskName taskName : JobModelUtil.getTaskNamesForProcessor(processorId, jobModel)) {
+              zkUtils.writeTaskLocality(taskName, locationId);
             }
             coordinatorListener.onNewJobModel(processorId, jobModel);
           }
diff --git a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
index 960bdf9..0e8baae 100644
--- a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
@@ -30,9 +30,52 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 public class TestJobModelUtil {
 
+  @Test
+  public void testGetTaskNamesForProcessorAbsentInJobModel() {
+    JobModel mockJobModel = mock(JobModel.class);
+    when(mockJobModel.getContainers()).thenReturn(mock(Map.class));
+
+    Set<TaskName> taskNames = JobModelUtil.getTaskNamesForProcessor("testProcessor", mockJobModel);
+    assertTrue("TaskNames should be empty", taskNames.isEmpty());
+  }
+
+  @Test
+  public void testGetTaskNamesForProcessorPresentInJobModel() {
+    TaskName expectedTaskName = new TaskName("testTaskName");
+    String processorId = "testProcessor";
+    JobModel mockJobModel = mock(JobModel.class);
+    ContainerModel mockContainerModel = mock(ContainerModel.class);
+    Map<String, ContainerModel> mockContainers = mock(Map.class);
+
+    when(mockContainers.get(processorId)).thenReturn(mockContainerModel);
+    when(mockContainerModel.getTasks()).thenReturn(ImmutableMap.of(expectedTaskName, mock(TaskModel.class)));
+    when(mockJobModel.getContainers()).thenReturn(mockContainers);
+
+    Set<TaskName> actualTaskNames = JobModelUtil.getTaskNamesForProcessor(processorId, mockJobModel);
+    assertEquals("Expecting TaskNames size = 1", 1, actualTaskNames.size());
+    assertTrue("Expecting testTaskName to be returned", actualTaskNames.contains(expectedTaskName));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testGetTaskNamesForProcessorWithNullJobModel() {
+    JobModelUtil.getTaskNamesForProcessor("processor", null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetTaskNamesForProcessorWithEmptyProcessorId() {
+    JobModelUtil.getTaskNamesForProcessor("", mock(JobModel.class));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetTaskNamesForProcessorWithNullProcessorId() {
+    JobModelUtil.getTaskNamesForProcessor(null, mock(JobModel.class));
+  }
+
   @Test(expected = IllegalArgumentException.class)
   public void testTaskToSystemStreamPartitionsWithNullJobModel() {
     JobModelUtil.getTaskToSystemStreamPartitions(null);