SAMZA-2638: Skip container stop/restart for processors that have same work assignment across rebalances (#1478)

Problem:
As part of rebalance, we always expire the current work assignment and proceed to signal consensus and start the container with new work assignment. It is inefficient for the processors to do the above when there is no changes in the work assignment between the active job model & the proposed job model.

Changes:

- Processors perform onJobModelExpired and onNewJobModel only if there are changes to their work assignment across the active and proposed job model.
- Processors no longer shutdown if they are not part of job model. Refer to the SAMZA-2638 for explanations
- Add helper methods in JobModelUtil for work assignment comparison
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index d356baf..f1d6073 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.job.model;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.HashMap;
@@ -119,6 +120,47 @@
     }
   }
 
+  /**
+   * Compares the {@link ContainerModel} for a given <i>processorId</i> across two {@link JobModel}.
+   * @param processorId processor id for which work assignments are compared
+   * @param first first job model
+   * @param second second job model
+   * @return true - if {@link ContainerModel} for the processor is same across the {@link JobModel}
+   *         false - otherwise
+   */
+  public static boolean compareContainerModelForProcessor(String processorId, JobModel first, JobModel second) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(processorId), "Processor id cannot be blank");
+    if (first == second) {
+      return true;
+    }
+
+    if (first == null || second == null) {
+      return false;
+    }
+
+    return compareContainerModel(first.getContainers().get(processorId), second.getContainers().get(processorId));
+  }
+
+  /**
+   * Helper method to compare the two input {@link ContainerModel}s.
+   * @param first first container model
+   * @param second second container model
+   * @return true - if two input {@link ContainerModel} are equal
+   *         false - otherwise
+   */
+  @VisibleForTesting
+  static boolean compareContainerModel(ContainerModel first, ContainerModel second) {
+    if (first == second) {
+      return true;
+    }
+
+    if (first == null || second == null) {
+      return false;
+    }
+
+    return first.equals(second);
+  }
+
   private static String getJobModelKey(String version) {
     return String.format("%s/%s", JOB_MODEL_GENERATION_KEY, version);
   }
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 6526705..102f357 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -98,7 +98,6 @@
   private final String processorId;
 
   private final Config config;
-  private final ZkBarrierForVersionUpgrade barrier;
   private final ZkJobCoordinatorMetrics metrics;
   private final ZkLeaderElector leaderElector;
   private final AtomicBoolean initiatedShutdown = new AtomicBoolean(false);
@@ -111,9 +110,13 @@
   private final CoordinatorStreamStore coordinatorStreamStore;
 
   private JobCoordinatorListener coordinatorListener = null;
-  private JobModel newJobModel;
+  // denotes the most recent job model agreed by the quorum
+  private JobModel activeJobModel;
+  // denotes job model that is latest but may have not reached consensus
+  private JobModel latestJobModel;
   private boolean hasLoadedMetadataResources = false;
   private String cachedJobModelVersion = null;
+  private ZkBarrierForVersionUpgrade barrier;
 
   @VisibleForTesting
   ZkSessionMetrics zkSessionMetrics;
@@ -232,7 +235,7 @@
 
   @Override
   public JobModel getJobModel() {
-    return newJobModel;
+    return latestJobModel;
   }
 
   @Override
@@ -269,11 +272,11 @@
 
     // Generate the JobModel
     LOG.info("Generating new JobModel with processors: {}.", currentProcessorIds);
-    JobModel jobModel = generateNewJobModel(processorNodes);
+    JobModel newJobModel = generateNewJobModel(processorNodes);
 
     // Create checkpoint and changelog streams if they don't exist
     if (!hasLoadedMetadataResources) {
-      loadMetadataResources(jobModel);
+      loadMetadataResources(newJobModel);
       hasLoadedMetadataResources = true;
     }
 
@@ -283,7 +286,7 @@
     LOG.info("pid=" + processorId + "Generated new JobModel with version: " + nextJMVersion + " and processors: " + currentProcessorIds);
 
     // Publish the new job model
-    publishJobModelToMetadataStore(jobModel, nextJMVersion);
+    publishJobModelToMetadataStore(newJobModel, nextJMVersion);
 
     // Start the barrier for the job model update
     barrier.create(nextJMVersion, currentProcessorIds);
@@ -393,6 +396,90 @@
   }
 
   /**
+   * Check if the new job model contains a different work assignment for the processor compared the last active job
+   * model. In case of different work assignment, expire the current job model by invoking the <i>onJobModelExpired</i>
+   * on the registered {@link JobCoordinatorListener}.
+   * At this phase, the job model is yet to be agreed by the quorum and hence, this optimization helps availability of
+   * the processors in the event no changes in the work assignment.
+   *
+   * @param newJobModel new job model published by the leader
+   */
+  @VisibleForTesting
+  void checkAndExpireJobModel(JobModel newJobModel) {
+    Preconditions.checkNotNull(newJobModel, "JobModel cannot be null");
+    if (coordinatorListener == null) {
+      LOG.info("Skipping job model expiration since there are no active listeners");
+      return;
+    }
+
+    if (JobModelUtil.compareContainerModelForProcessor(processorId, activeJobModel, newJobModel)) {
+      LOG.info("Skipping job model expiration for processor {} due to no change in work assignment.", processorId);
+    } else {
+      LOG.info("Work assignment changed for the processor {}. Notifying job model expiration to coordinator listener", processorId);
+      coordinatorListener.onJobModelExpired();
+    }
+  }
+
+  /**
+   * Checks if the new job model contains a different work assignment for the processor compared to the last active
+   * job model. In case of different work assignment, update the task locality of the tasks associated with the
+   * processor and notify new job model to the registered {@link JobCoordinatorListener}.
+   *
+   * @param newJobModel new job model agreed by the quorum
+   */
+  @VisibleForTesting
+  void onNewJobModel(JobModel newJobModel) {
+    Preconditions.checkNotNull(newJobModel, "JobModel cannot be null. Failing onNewJobModel");
+    // start the container with the new model
+    if (!JobModelUtil.compareContainerModelForProcessor(processorId, activeJobModel, newJobModel)) {
+      LOG.info("Work assignment changed for the processor {}. Updating task locality and notifying coordinator listener", processorId);
+      if (newJobModel.getContainers().containsKey(processorId)) {
+        for (TaskName taskName : JobModelUtil.getTaskNamesForProcessor(processorId, newJobModel)) {
+          zkUtils.writeTaskLocality(taskName, locationId);
+        }
+
+        if (coordinatorListener != null) {
+          coordinatorListener.onNewJobModel(processorId, newJobModel);
+        }
+      }
+    } else {
+      /*
+       * The implication of work assignment remaining the same can be categorized into
+       *   1. Processor part of the job model
+       *   2. Processor not part of the job model.
+       * For both the state of the processor remains what it was when the rebalance started. e.g.,
+       *   [1] should continue to process its work assignment without any interruption as part of the rebalance. i.e.,
+       *       there will be no expiration of the existing work (a.k.a samza container won't be stopped) and also no
+       *       notification to StreamProcessor about the rebalance since work assignment didn't change.
+       *   [2] should have no work and be idle processor and will continue to be idle.
+       */
+      LOG.info("Skipping onNewJobModel since there are no changes in work assignment.");
+    }
+
+    /*
+     * Update the last active job model to new job model regardless of whether the work assignment for the processor
+     * has changed or not. It is important to do it so that all the processors has a consistent view what the latest
+     * active job model is.
+     */
+    activeJobModel = newJobModel;
+  }
+
+  @VisibleForTesting
+  JobModel getActiveJobModel() {
+    return activeJobModel;
+  }
+
+  @VisibleForTesting
+  void setActiveJobModel(JobModel jobModel) {
+    activeJobModel = jobModel;
+  }
+
+  @VisibleForTesting
+  void setZkBarrierUpgradeForVersion(ZkBarrierForVersionUpgrade barrierUpgradeForVersion) {
+    barrier = barrierUpgradeForVersion;
+  }
+
+  /**
    * Builds the {@link GrouperMetadataImpl} based upon provided {@param jobModelVersion}
    * and {@param processorNodes}.
    * @param jobModelVersion the most recent jobModelVersion available in the zookeeper.
@@ -467,16 +554,7 @@
       if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
         debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> {
           LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
-
-          // read the new Model
-          JobModel jobModel = getJobModel();
-          // start the container with the new model
-          if (coordinatorListener != null) {
-            for (TaskName taskName : JobModelUtil.getTaskNamesForProcessor(processorId, jobModel)) {
-              zkUtils.writeTaskLocality(taskName, locationId);
-            }
-            coordinatorListener.onNewJobModel(processorId, jobModel);
-          }
+          onNewJobModel(getJobModel());
         });
       } else {
         if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
@@ -541,21 +619,11 @@
 
         LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data);
 
-        newJobModel = readJobModelFromMetadataStore(jobModelVersion);
-        LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel);
-
-        if (!newJobModel.getContainers().containsKey(processorId)) {
-          LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
-              processorId, newJobModel);
-          stop();
-        } else {
-          // stop current work
-          if (coordinatorListener != null) {
-            coordinatorListener.onJobModelExpired();
-          }
-          // update ZK and wait for all the processors to get this new version
-          barrier.join(jobModelVersion, processorId);
-        }
+        latestJobModel = readJobModelFromMetadataStore(jobModelVersion);
+        LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + latestJobModel);
+        checkAndExpireJobModel(latestJobModel);
+        // update ZK and wait for all the processors to get this new version
+        barrier.join(jobModelVersion, processorId);
       });
     }
 
diff --git a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
index 0e8baae..856b2f8 100644
--- a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
@@ -29,11 +29,75 @@
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
 public class TestJobModelUtil {
+  private static final String PROCESSOR_ID = "testProcessor";
+
+  @Test
+  public void testCompareContainerModel() {
+    assertTrue("Expecting null container models to return true", JobModelUtil.compareContainerModel(null, null));
+
+    assertFalse("Expecting false for two different container model",
+        JobModelUtil.compareContainerModel(mock(ContainerModel.class), mock(ContainerModel.class)));
+
+    final ContainerModel mockContainerModel = mock(ContainerModel.class);
+    assertTrue("Expecting true for same container model",
+        JobModelUtil.compareContainerModel(mockContainerModel, mockContainerModel));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCompareContainerModelForNullProcessor() {
+    JobModelUtil.compareContainerModelForProcessor(null, mock(JobModel.class), mock(JobModel.class));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCompareContainerModelForBlankProcessor() {
+    JobModelUtil.compareContainerModelForProcessor("", mock(JobModel.class), mock(JobModel.class));
+  }
+
+  /**
+   * Tests the following scenarios as part of the same tests to reduce boiler plate and potentially allow
+   * parallel executions of tests for the class.
+   *  1. Test null job models for processors
+   *  2. Test same container models across job models for processors
+   *  3. Test different container models across job models for processors
+   *  4. Test absence of container model vs presence across job models for processors
+   *
+   * The approach below leans towards performance (parallel execution) as opposed to readability; i.e sharing setup of
+   * the tests and having multiple tests that test individual scenarios. Additionally, the individual scenarios being
+   * tested are self explanatory.
+   */
+  @Test
+  public void testCompareContainerModelForProcessor() {
+    final JobModel firstJobModel = mock(JobModel.class);
+    final JobModel secondJobModel = mock(JobModel.class);
+    final ContainerModel mockContainerModel = mock(ContainerModel.class);
+    Map<String, ContainerModel> mockContainerModels = mock(Map.class);
+
+    assertTrue("Null job models should return true for comparison",
+        JobModelUtil.compareContainerModelForProcessor(PROCESSOR_ID, null, null));
+
+    when(firstJobModel.getContainers()).thenReturn(mockContainerModels);
+    when(secondJobModel.getContainers()).thenReturn(mockContainerModels);
+    when(mockContainerModels.get(PROCESSOR_ID)).thenReturn(mockContainerModel);
+    assertTrue("Expecting both job model to have same container model for the processor",
+        JobModelUtil.compareContainerModelForProcessor(PROCESSOR_ID, firstJobModel, secondJobModel));
+
+    when(mockContainerModels.get(PROCESSOR_ID))
+        .thenReturn(mockContainerModel)
+        .thenReturn(mock(ContainerModel.class));
+    assertFalse("Expecting container models to be different across job models for the processor",
+        JobModelUtil.compareContainerModelForProcessor(PROCESSOR_ID, firstJobModel, secondJobModel));
+
+    when(mockContainerModels.get(PROCESSOR_ID)).thenReturn(null)
+        .thenReturn(mockContainerModel);
+    assertFalse("Expecting container models to be different across job models for the processor",
+        JobModelUtil.compareContainerModelForProcessor(PROCESSOR_ID, firstJobModel, secondJobModel));
+  }
 
   @Test
   public void testGetTaskNamesForProcessorAbsentInJobModel() {
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index d9d5379..bf681cb 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -26,12 +26,14 @@
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.MetadataResourceUtil;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
@@ -46,32 +48,49 @@
 import org.apache.samza.zk.ZkJobCoordinator.ZkSessionStateChangedListener;
 import org.apache.zookeeper.Watcher;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
-import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyObject;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 public class TestZkJobCoordinator {
+  private static final String PROCESSOR_ID = "testProcessor";
   private static final String TEST_BARRIER_ROOT = "/testBarrierRoot";
   private static final String TEST_JOB_MODEL_VERSION = "1";
 
+
   private final Config config;
   private final JobModel jobModel;
   private final MetadataStore zkMetadataStore;
   private final CoordinatorStreamStore coordinatorStreamStore;
 
+  private ZkUtils zkUtils;
+
+  @Before
+  public void setup() {
+    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
+    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
+    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+
+    zkUtils = Mockito.mock(ZkUtils.class);
+    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+  }
+
   public TestZkJobCoordinator() {
     Map<String, String> configMap = ImmutableMap.of(
         "job.coordinator.system", "kafka",
@@ -91,46 +110,111 @@
   }
 
   @Test
-  public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() throws Exception {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    CountDownLatch jcShutdownLatch = new CountDownLatch(1);
-    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+  public void testCheckAndExpireWithNoChangeInWorkAssignment() {
+    BiConsumer<ZkUtils, JobCoordinatorListener> verificationMethod =
+      (ignored, coordinatorListener) -> verifyZeroInteractions(coordinatorListener);
 
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+    testNoChangesInWorkAssignmentHelper(ZkJobCoordinator::checkAndExpireJobModel, verificationMethod);
+  }
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
-        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
-    doReturn(new JobModel(new MapConfig(), new HashMap<>())).when(zkJobCoordinator).readJobModelFromMetadataStore(TEST_JOB_MODEL_VERSION);
-    doAnswer(new Answer<Void>() {
-      public Void answer(InvocationOnMock invocation) {
-        jcShutdownLatch.countDown();
-        return null;
-      }
-    }).when(zkJobCoordinator).stop();
+  @Test
+  public void testCheckAndExpireWithChangeInWorkAssignment() {
+    final String processorId = "testProcessor";
+    JobCoordinatorListener mockListener = mock(JobCoordinatorListener.class);
 
-    final ZkJobCoordinator.ZkJobModelVersionChangeHandler zkJobModelVersionChangeHandler = zkJobCoordinator.new ZkJobModelVersionChangeHandler(zkUtils);
-    zkJobModelVersionChangeHandler.doHandleDataChange("path", TEST_JOB_MODEL_VERSION);
-    verify(zkJobCoordinator, Mockito.atMost(1)).stop();
-    assertTrue("Timed out waiting for JobCoordinator to stop", jcShutdownLatch.await(1, TimeUnit.MINUTES));
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(processorId, new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore);
+
+    zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.checkAndExpireJobModel(mock(JobModel.class));
+    verify(mockListener, times(1)).onJobModelExpired();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testCheckAndExpireJobModelWithNullJobModel() {
+    final String processorId = "testProcessor";
+
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(processorId, new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore);
+    zkJobCoordinator.checkAndExpireJobModel(null);
+  }
+
+  @Test
+  public void testOnNewJobModelWithChangeInWorkAssignment() {
+    final TaskName taskName = new TaskName("task1");
+    final ContainerModel mockContainerModel = mock(ContainerModel.class);
+    final JobCoordinatorListener mockListener = mock(JobCoordinatorListener.class);
+    final JobModel mockJobModel = mock(JobModel.class);
+
+    when(mockContainerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, mock(TaskModel.class)));
+    when(mockJobModel.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID, mockContainerModel));
+
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore);
+    zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.onNewJobModel(mockJobModel);
+
+    verify(zkUtils, times(1)).writeTaskLocality(eq(taskName), any());
+    verify(mockListener, times(1)).onNewJobModel(PROCESSOR_ID, mockJobModel);
+    assertEquals("Active job model should be updated with the new job model", mockJobModel,
+        zkJobCoordinator.getActiveJobModel());
+  }
+
+  @Test
+  public void testOnNewJobModelWithNoChangesInWorkAssignment() {
+    BiConsumer<ZkUtils, JobCoordinatorListener> verificationMethod = (zkUtils, coordinatorListener) -> {
+      verify(zkUtils, times(0)).writeTaskLocality(any(), any());
+      verifyZeroInteractions(coordinatorListener);
+    };
+
+    testNoChangesInWorkAssignmentHelper(ZkJobCoordinator::onNewJobModel, verificationMethod);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testOnNewJobModelWithNullJobModel() {
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore);
+    zkJobCoordinator.onNewJobModel(null);
+  }
+
+  /**
+   * Test job model version changed changes to work assignment. In this scenario, existing work should
+   * be stopped a.k.a processor should stop the container through the listener. The processor then proceeds to join
+   * the barrier to notify its acceptance on the proposed job model.
+   */
+  @Test
+  public void testJobModelVersionChangeWithChangeInWorkAssignment() throws Exception {
+    BiConsumer<ZkBarrierForVersionUpgrade, JobCoordinatorListener> verificationMethod =
+      (barrier, listener) -> {
+        verify(listener, times(1)).onJobModelExpired();
+        verify(barrier, times(1)).join(TEST_JOB_MODEL_VERSION, PROCESSOR_ID);
+      };
+    testJobModelVersionChangeHelper(null, mock(JobModel.class), verificationMethod);
+  }
+
+  /**
+   * Test job model version changed without any changes to work assignment. In this scenario, existing work should
+   * not be stopped a.k.a processor shouldn't stop the container. However, the processor proceeds to join the barrier
+   * to notify its acceptance on the proposed job model.
+   */
+  @Test
+  public void testJobModelVersionChangeWithNoChangeInWorkAssignment() throws Exception {
+    final JobModel jobModel = mock(JobModel.class);
+    BiConsumer<ZkBarrierForVersionUpgrade, JobCoordinatorListener> verificationMethod =
+      (barrier, listener) -> {
+        verifyZeroInteractions(listener);
+        verify(barrier, times(1)).join(TEST_JOB_MODEL_VERSION, PROCESSOR_ID);
+      };
+    testJobModelVersionChangeHelper(jobModel, jobModel, verificationMethod);
   }
 
   @Test
   public void testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 
     ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
@@ -145,19 +229,12 @@
   }
 
   @Test
-  public void testZookeeperSessionMetricsAreUpdatedCoorrectly() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+  public void testZookeeperSessionMetricsAreUpdatedCorrectly() {
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 
     ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
@@ -178,18 +255,11 @@
 
   @Test
   public void testShouldStopPartitionCountMonitorOnSessionExpiration() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 
     ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
     StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
@@ -202,18 +272,11 @@
 
   @Test
   public void testShouldStartPartitionCountMonitorOnBecomingLeader() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 
     ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
 
     StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
@@ -230,18 +293,11 @@
 
   @Test
   public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
 
     ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
 
     StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
@@ -255,18 +311,11 @@
 
   @Test
   public void testLoadMetadataResources() throws IOException {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
 
     StartpointManager mockStartpointManager = Mockito.mock(StartpointManager.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), zkUtils,
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(PROCESSOR_ID, config, new NoOpMetricsRegistry(), zkUtils,
         zkMetadataStore, coordinatorStreamStore));
     doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager();
 
@@ -285,18 +334,11 @@
 
   @Test
   public void testDoOnProcessorChange() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
 
     StartpointManager mockStartpointManager = Mockito.mock(StartpointManager.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", config,
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(PROCESSOR_ID, config,
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
     doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager();
 
@@ -308,4 +350,49 @@
     verify(zkUtils).publishJobModelVersion(anyString(), anyString());
     verify(zkJobCoordinator).loadMetadataResources(eq(jobModel));
   }
+
+  private void testNoChangesInWorkAssignmentHelper(BiConsumer<ZkJobCoordinator, JobModel> testMethod,
+      BiConsumer<ZkUtils, JobCoordinatorListener> verificationMethod) {
+    final JobCoordinatorListener mockListener = mock(JobCoordinatorListener.class);
+    final JobModel mockJobModel = mock(JobModel.class);
+
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore);
+    zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.setActiveJobModel(mockJobModel);
+
+    testMethod.accept(zkJobCoordinator, mockJobModel);
+    verificationMethod.accept(zkUtils, mockListener);
+  }
+
+  private void testJobModelVersionChangeHelper(JobModel activeJobModel, JobModel newJobModel,
+      BiConsumer<ZkBarrierForVersionUpgrade, JobCoordinatorListener> verificationMethod) throws InterruptedException {
+    final CountDownLatch completionLatch = new CountDownLatch(1);
+    final JobCoordinatorListener mockListener = mock(JobCoordinatorListener.class);
+    final ScheduleAfterDebounceTime mockDebounceTimer = mock(ScheduleAfterDebounceTime.class);
+    final ZkBarrierForVersionUpgrade mockBarrier = mock(ZkBarrierForVersionUpgrade.class);
+
+    doAnswer(ctx -> {
+      Object[] args = ctx.getArguments();
+      ((Runnable) args[2]).run();
+      completionLatch.countDown();
+      return null;
+    }).when(mockDebounceTimer).scheduleAfterDebounceTime(anyString(), anyLong(), anyObject());
+
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
+    zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.setActiveJobModel(activeJobModel);
+    zkJobCoordinator.setZkBarrierUpgradeForVersion(mockBarrier);
+    zkJobCoordinator.debounceTimer = mockDebounceTimer;
+    doReturn(newJobModel).when(zkJobCoordinator).readJobModelFromMetadataStore(TEST_JOB_MODEL_VERSION);
+
+    final ZkJobCoordinator.ZkJobModelVersionChangeHandler zkJobModelVersionChangeHandler =
+        zkJobCoordinator.new ZkJobModelVersionChangeHandler(zkUtils);
+    zkJobModelVersionChangeHandler.doHandleDataChange("path", TEST_JOB_MODEL_VERSION);
+    completionLatch.await(1, TimeUnit.SECONDS);
+
+    verificationMethod.accept(mockBarrier, mockListener);
+  }
 }