SAMZA-2663: Handle job model expiration and new job model flows for multiple incomplete rebalances (#1528)

Problem:
As part of SAMZA-2638, we introduced skipping container restart and stops on no changes to work assignment for processors across rebalances. However, we only update the active job model with the proposed job model on starting the container as part of onNewJobModel. This leads to a scenario where the processor is stopped but the future rebalances assume the container is still running. More information on scenario below.

Changes:

Track job model expiration
onNewJobModel triggers new job model as long as the active job model has been expired
Handle no change in work assignment optimization only during checkJobModelExpired flow.
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index f41ee8c..447143a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -110,6 +110,10 @@
   private final MetadataStore jobModelMetadataStore;
   private final CoordinatorStreamStore coordinatorStreamStore;
 
+  // It is sufficient for the field to be volatile as the flows that read/update execute on debounce timer which is single threaded
+  // Choice of atomic boolean is purely for convenience for operations like compareAndSet to enforce invariant checks.
+  private final AtomicBoolean jobModelExpired = new AtomicBoolean(false);
+
   private JobCoordinatorListener coordinatorListener = null;
   // denotes the most recent job model agreed by the quorum
   private JobModel activeJobModel;
@@ -441,6 +445,7 @@
     } else {
       LOG.info("Work assignment changed for the processor {}. Notifying job model expiration to coordinator listener", processorId);
       coordinatorListener.onJobModelExpired();
+      jobModelExpired.set(true);
     }
   }
 
@@ -455,7 +460,7 @@
   void onNewJobModel(JobModel newJobModel) {
     Preconditions.checkNotNull(newJobModel, "JobModel cannot be null. Failing onNewJobModel");
     // start the container with the new model
-    if (!JobModelUtil.compareContainerModelForProcessor(processorId, activeJobModel, newJobModel)) {
+    if (jobModelExpired.compareAndSet(true, false)) {
       LOG.info("Work assignment changed for the processor {}. Updating task locality and notifying coordinator listener", processorId);
       if (newJobModel.getContainers().containsKey(processorId)) {
         for (TaskName taskName : JobModelUtil.getTaskNamesForProcessor(processorId, newJobModel)) {
@@ -468,6 +473,7 @@
       }
     } else {
       /*
+       * We don't expire the job model if the proposed work assignment is same as the current work assignment.
        * The implication of work assignment remaining the same can be categorized into
        *   1. Processor part of the job model
        *   2. Processor not part of the job model.
@@ -499,6 +505,16 @@
   }
 
   @VisibleForTesting
+  boolean getJobModelExpired() {
+    return jobModelExpired.get();
+  }
+
+  @VisibleForTesting
+  void setJobModelExpired(boolean value) {
+    jobModelExpired.set(value);
+  }
+
+  @VisibleForTesting
   void setDebounceTimer(ScheduleAfterDebounceTime scheduleAfterDebounceTime) {
     debounceTimer = scheduleAfterDebounceTime;
   }
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 0ccb8d9..586dfc0 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -54,7 +54,9 @@
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
@@ -66,6 +68,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
@@ -113,6 +116,58 @@
   }
 
   @Test
+  public void testCheckAndExpireWithMultipleRebalances() {
+    final TaskName taskName = new TaskName("task1");
+    final ContainerModel mockContainerModel = mock(ContainerModel.class);
+    final JobCoordinatorListener mockListener = mock(JobCoordinatorListener.class);
+    final JobModel jobModelVersion1 = mock(JobModel.class);
+    final JobModel jobModelVersion2 = mock(JobModel.class);
+    final JobModel jobModelVersion3 = jobModelVersion1;
+
+    when(mockContainerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, mock(TaskModel.class)));
+    when(jobModelVersion3.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID, mockContainerModel));
+
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(), new NoOpMetricsRegistry(),
+        zkUtils, zkMetadataStore, coordinatorStreamStore);
+    zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.setActiveJobModel(jobModelVersion1);
+
+    /*
+     * The following mimics the scenario where new work assignment(V2) is proposed by the leader and the work assignment
+     * differs from the active work assignment(V1) and hence results in job model expiration
+     */
+    zkJobCoordinator.checkAndExpireJobModel(jobModelVersion2);
+
+    verify(mockListener, times(1)).onJobModelExpired();
+    assertTrue("JobModelExpired should be true for work assignment changes", zkJobCoordinator.getJobModelExpired());
+    assertEquals("Active job model shouldn't be updated", jobModelVersion1, zkJobCoordinator.getActiveJobModel());
+
+    /*
+     * The following mimics the scenario where leader kicked off another rebalance where the new work assignment(V3)
+     * is same as the old work assignment(V1) and doesn't trigger job model expiration. We check the interactions w/
+     * the listener to ensure job model expiration isn't invoked. However, the previous rebalance should have already
+     * triggered job model expiration and set the job model expired flag to true
+     */
+    zkJobCoordinator.checkAndExpireJobModel(jobModelVersion1);
+    verifyNoMoreInteractions(mockListener);
+    assertTrue("JobModelExpired should remain unchanged", zkJobCoordinator.getJobModelExpired());
+    assertEquals("Active job model shouldn't be updated", jobModelVersion1, zkJobCoordinator.getActiveJobModel());
+
+
+    /*
+     * The following mimics the scenario where the new work assignment(V3) proposed by the leader is accepted and
+     * on new job model is invoked. Even though the work assignment remains the same w/ the active job model version,
+     * onNewJobModel is invoked on the listener as an intermediate rebalance expired the old work assignment(V1)
+     */
+    zkJobCoordinator.onNewJobModel(jobModelVersion3);
+    verify(mockListener, times(1)).onNewJobModel(PROCESSOR_ID, jobModelVersion3);
+    verify(zkUtils, times(1)).writeTaskLocality(any(), any());
+
+    assertEquals("Active job model should be updated to new job model", zkJobCoordinator.getActiveJobModel(), jobModelVersion3);
+    assertFalse("JobModelExpired should be set to false after onNewJobModel", zkJobCoordinator.getJobModelExpired());
+  }
+
+  @Test
   public void testCheckAndExpireWithNoChangeInWorkAssignment() {
     BiConsumer<ZkUtils, JobCoordinatorListener> verificationMethod =
       (ignored, coordinatorListener) -> verifyZeroInteractions(coordinatorListener);
@@ -155,6 +210,7 @@
     ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore);
     zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.setJobModelExpired(true);
     zkJobCoordinator.onNewJobModel(mockJobModel);
 
     verify(zkUtils, times(1)).writeTaskLocality(eq(taskName), any());