Batch Auto Pause improvements (#98)

* Fixing auto pause leaving an instance without processing before pausing the update.

* Simplifying code. Removing final pause after update has reached a terminal state.
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 1a54c4c..6adb82d 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -32,7 +32,6 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -639,6 +638,17 @@
   @VisibleForTesting
   static final String PULSE_TIMEOUT_MESSAGE = "Pulses from external service have timed out.";
 
+  // Determines whether it is necessary to skip evaluating a side effect if we will be pausing
+  // the update after this evaluation
+  private boolean skipSideEffect(
+      boolean pauseAfterBatch,
+      SideEffect sideEffect) {
+    return pauseAfterBatch
+        && Collections.disjoint(
+        sideEffect.getStatusChanges(),
+        InstanceUpdateStatus.TERMINAL_STATUSES);
+  }
+
   private void evaluateUpdater(
       final MutableStoreProvider storeProvider,
       final UpdateFactory.Update update,
@@ -667,18 +677,24 @@
 
     EvaluationResult<Integer> result = update.getUpdater().evaluate(changedInstance, stateProvider);
 
-    LOG.info(key + " evaluation result: " + result);
-
-    final boolean autoPauseAfterBatch =
-        isAutoPauseEnabled(instructions.getSettings().getUpdateStrategy());
-    if (autoPauseAfterBatch && maybeAutoPause(summary, result)) {
+    LOG.info("{} evaluation result: {}", key, result);
+    final boolean autoPauseAfterCurrentBatch =
+        isAutoPauseEnabled(instructions.getSettings().getUpdateStrategy())
+        && maybeAutoPause(summary, result);
+    if (autoPauseAfterCurrentBatch) {
       changeUpdateStatus(storeProvider,
           summary,
           newEvent(getPausedState(summary.getState().getStatus())).setMessage(UPDATE_AUTO_PAUSED));
-      return;
     }
 
     for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) {
+      // If we're pausing after processing this set of side effects, only process the side effects
+      // which are in a terminal state in order to avoid starting new shards after the pause
+      // has kicked in.
+      if (skipSideEffect(autoPauseAfterCurrentBatch, entry.getValue())) {
+        continue;
+      }
+
       Iterable<InstanceUpdateStatus> statusChanges;
 
       int instanceId = entry.getKey();
@@ -689,7 +705,7 @@
           .collect(Collectors.toList());
 
       Set<JobUpdateAction> savedActions =
-          FluentIterable.from(savedEvents).transform(EVENT_TO_ACTION).toSet();
+          savedEvents.stream().map(EVENT_TO_ACTION).collect(Collectors.toSet());
 
       // Don't bother persisting a sequence of status changes that represents an instance that
       // was immediately recognized as being healthy and in the desired state.
@@ -747,34 +763,34 @@
         }
       }
 
-      if (autoPauseAfterBatch) {
-        instancesSeen.remove(key);
-      }
       changeUpdateStatus(storeProvider, summary, event);
-    } else {
-      LOG.info("Executing side-effects for update of " + key + ": " + result.getSideEffects());
-      for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) {
-        IInstanceKey instance = InstanceKeys.from(key.getJob(), entry.getKey());
+      return;
+    }
 
-        Optional<InstanceAction> action = entry.getValue().getAction();
-        if (action.isPresent()) {
-          Optional<InstanceActionHandler> handler = action.get().getHandler();
-          if (handler.isPresent()) {
-            Optional<Amount<Long, Time>> reevaluateDelay = handler.get().getReevaluationDelay(
-                instance,
-                instructions,
-                storeProvider,
-                stateManager,
-                updateAgentReserver,
-                updaterStatus,
-                key,
-                slaKillController);
-            if (reevaluateDelay.isPresent()) {
-              executor.schedule(
-                  getDeferredEvaluator(instance, key),
-                  reevaluateDelay.get().getValue(),
-                  reevaluateDelay.get().getUnit().getTimeUnit());
-            }
+    LOG.info("Executing side-effects for update of {}: {}",
+        key,
+        result.getSideEffects().entrySet());
+    for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) {
+      IInstanceKey instance = InstanceKeys.from(key.getJob(), entry.getKey());
+
+      Optional<InstanceAction> action = entry.getValue().getAction();
+      if (action.isPresent() && !skipSideEffect(autoPauseAfterCurrentBatch, entry.getValue())) {
+        Optional<InstanceActionHandler> handler = action.get().getHandler();
+        if (handler.isPresent()) {
+          Optional<Amount<Long, Time>> reevaluateDelay = handler.get().getReevaluationDelay(
+              instance,
+              instructions,
+              storeProvider,
+              stateManager,
+              updateAgentReserver,
+              updaterStatus,
+              key,
+              slaKillController);
+          if (reevaluateDelay.isPresent()) {
+            executor.schedule(
+                getDeferredEvaluator(instance, key),
+                reevaluateDelay.get().getValue(),
+                reevaluateDelay.get().getUnit().getTimeUnit());
           }
         }
       }
@@ -869,11 +885,9 @@
       Set<Integer> instancesCached = instancesSeen.get(key);
       Set<Integer> instancesBeingUpdated = result.getSideEffects().keySet();
 
-      // On the final batch, pause for acknowledgement and remove the cache of instances.
-      // This will cause the else branch to get run on resume and the update will finish.
       if (result.getStatus() == SUCCEEDED) {
         instancesSeen.remove(key);
-        return true;
+        return false;
       }
 
       // If the update evaluation is dealing with new instances, that signals we are at a barrier
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java b/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java
index aa736a6..1e6dd3a 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java
@@ -18,6 +18,7 @@
 import java.util.Set;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.scheduler.updater.StateEvaluator.Failure;
 
@@ -139,6 +140,9 @@
     /**
      * The instance failed to update and is no longer being monitored.
      */
-    FAILED
+    FAILED;
+
+    public static final Set<InstanceUpdateStatus> TERMINAL_STATUSES =
+        ImmutableSet.of(SUCCEEDED, FAILED);
   }
 }
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
index d15217b..376e1c4 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
@@ -81,7 +81,7 @@
   }
 
   /**
-   * Get the lastest {@link JobUpdateStatus} for an update.
+   * Get the latest {@link JobUpdateStatus} for an update.
    */
   static JobUpdateStatus getJobUpdateStatus(IJobUpdateDetails jobUpdateDetails) {
     return Iterables.getLast(jobUpdateDetails.getUpdateEvents()).getStatus();
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 82f9ef4..4bafb44 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -1842,23 +1842,18 @@
     updater.start(update, AUDIT);
     actions.put(0, INSTANCE_UPDATING).putAll(1, INSTANCE_UPDATING);
     assertState(ROLLING_FORWARD, actions.build());
-    changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
-    clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS));
     changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING);
-    clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS));
-
-    // Instance 1 finished first, but update does not yet proceed until 0 finishes.
-    actions.put(1, INSTANCE_UPDATED);
-    assertState(ROLLING_FORWARD, actions.build());
+    changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
     clock.advance(WATCH_TIMEOUT);
 
+    actions.put(0, INSTANCE_UPDATED).put(1, INSTANCE_UPDATED);
+
     // Update should now be paused
     assertState(ROLL_FORWARD_PAUSED, actions.build());
 
     // Continue the update
     updater.resume(UPDATE_ID, AUDIT);
 
-    actions.put(0, INSTANCE_UPDATED);
     actions.put(2, INSTANCE_UPDATING);
 
     assertState(ROLLING_FORWARD, actions.build());
@@ -1866,14 +1861,8 @@
     // Instance 2 is updated.
     changeState(JOB, 2, FINISHED, ASSIGNED, STARTING, RUNNING);
     clock.advance(WATCH_TIMEOUT);
-
-    // Update should now be paused for a second time
-    assertState(ROLL_FORWARD_PAUSED, actions.build());
-
-    // Continue the update
-    updater.resume(UPDATE_ID, AUDIT);
-
     actions.put(2, INSTANCE_UPDATED);
+
     assertState(ROLLED_FORWARD, actions.build());
 
     assertJobState(
@@ -1911,11 +1900,12 @@
     changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING);
     clock.advance(WATCH_TIMEOUT);
 
+    actions.put(0, INSTANCE_UPDATED);
+
     // Update should now be paused after first batch is done.
     assertState(ROLL_FORWARD_PAUSED, actions.build());
     updater.resume(UPDATE_ID, AUDIT);
 
-    actions.put(0, INSTANCE_UPDATED);
     actions.put(1, INSTANCE_UPDATING).put(2, INSTANCE_UPDATING);
     assertState(ROLLING_FORWARD, actions.build());
 
@@ -1926,33 +1916,25 @@
     changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
     clock.advance(WATCH_TIMEOUT);
 
-    // Instance 2 will finish before instance 1
+    actions.put(1, INSTANCE_UPDATED);
     actions.put(2, INSTANCE_UPDATED);
 
     // Second autoPause at second barrier
     assertState(ROLL_FORWARD_PAUSED, actions.build());
 
     updater.resume(UPDATE_ID, AUDIT);
-    actions.put(1, INSTANCE_UPDATED);
     actions.put(3, INSTANCE_UPDATING).put(4, INSTANCE_UPDATING).put(5, INSTANCE_UPDATING);
 
     assertState(ROLLING_FORWARD, actions.build());
 
     // Third batch is moving forward.
-    // Make instance 4 the instance that waits for final transition to SUCCEED
-    changeState(JOB, 5, FINISHED, ASSIGNED, STARTING, RUNNING);
     changeState(JOB, 3, FINISHED, ASSIGNED, STARTING, RUNNING);
-    clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 3, Time.MILLISECONDS));
     changeState(JOB, 4, FINISHED, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 5, FINISHED, ASSIGNED, STARTING, RUNNING);
     clock.advance(WATCH_TIMEOUT);
 
-    actions.put(3, INSTANCE_UPDATED).put(5, INSTANCE_UPDATED);
+    actions.put(3, INSTANCE_UPDATED).put(4, INSTANCE_UPDATED).put(5, INSTANCE_UPDATED);
 
-    // Third barrier
-    assertState(ROLL_FORWARD_PAUSED, actions.build());
-    updater.resume(UPDATE_ID, AUDIT);
-
-    actions.put(4, INSTANCE_UPDATED);
     assertState(ROLLED_FORWARD, actions.build());
 
     assertJobState(
@@ -2001,11 +1983,13 @@
     changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
     clock.advance(WATCH_TIMEOUT);
 
+    actions.put(2, INSTANCE_UPDATED);
+
     // Update should now be paused after first batch is done.
     assertState(ROLL_FORWARD_PAUSED, actions.build());
     updater.resume(UPDATE_ID, AUDIT);
 
-    actions.put(2, INSTANCE_UPDATED).put(0, INSTANCE_UPDATING).put(1, INSTANCE_UPDATING);
+    actions.put(0, INSTANCE_UPDATING).put(1, INSTANCE_UPDATING);
     assertState(ROLLING_FORWARD, actions.build());
 
     // Move on to batch two