Batch Auto Pause improvements (#98)
* Fixing auto pause leaving an instance without processing before pausing the update.
* Simplifying code. Removing final pause after update has reached a terminal state.
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 1a54c4c..6adb82d 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -32,7 +32,6 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -639,6 +638,17 @@
@VisibleForTesting
static final String PULSE_TIMEOUT_MESSAGE = "Pulses from external service have timed out.";
+ // Determines whether it is necessary to skip evaluating a side effect if we will be pausing
+ // the update after this evaluation
+ private boolean skipSideEffect(
+ boolean pauseAfterBatch,
+ SideEffect sideEffect) {
+ return pauseAfterBatch
+ && Collections.disjoint(
+ sideEffect.getStatusChanges(),
+ InstanceUpdateStatus.TERMINAL_STATUSES);
+ }
+
private void evaluateUpdater(
final MutableStoreProvider storeProvider,
final UpdateFactory.Update update,
@@ -667,18 +677,24 @@
EvaluationResult<Integer> result = update.getUpdater().evaluate(changedInstance, stateProvider);
- LOG.info(key + " evaluation result: " + result);
-
- final boolean autoPauseAfterBatch =
- isAutoPauseEnabled(instructions.getSettings().getUpdateStrategy());
- if (autoPauseAfterBatch && maybeAutoPause(summary, result)) {
+ LOG.info("{} evaluation result: {}", key, result);
+ final boolean autoPauseAfterCurrentBatch =
+ isAutoPauseEnabled(instructions.getSettings().getUpdateStrategy())
+ && maybeAutoPause(summary, result);
+ if (autoPauseAfterCurrentBatch) {
changeUpdateStatus(storeProvider,
summary,
newEvent(getPausedState(summary.getState().getStatus())).setMessage(UPDATE_AUTO_PAUSED));
- return;
}
for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) {
+ // If we're pausing after processing this set of side effects, only process the side effects
+ // which are in a terminal state in order to avoid starting new shards after the pause
+ // has kicked in.
+ if (skipSideEffect(autoPauseAfterCurrentBatch, entry.getValue())) {
+ continue;
+ }
+
Iterable<InstanceUpdateStatus> statusChanges;
int instanceId = entry.getKey();
@@ -689,7 +705,7 @@
.collect(Collectors.toList());
Set<JobUpdateAction> savedActions =
- FluentIterable.from(savedEvents).transform(EVENT_TO_ACTION).toSet();
+ savedEvents.stream().map(EVENT_TO_ACTION).collect(Collectors.toSet());
// Don't bother persisting a sequence of status changes that represents an instance that
// was immediately recognized as being healthy and in the desired state.
@@ -747,34 +763,34 @@
}
}
- if (autoPauseAfterBatch) {
- instancesSeen.remove(key);
- }
changeUpdateStatus(storeProvider, summary, event);
- } else {
- LOG.info("Executing side-effects for update of " + key + ": " + result.getSideEffects());
- for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) {
- IInstanceKey instance = InstanceKeys.from(key.getJob(), entry.getKey());
+ return;
+ }
- Optional<InstanceAction> action = entry.getValue().getAction();
- if (action.isPresent()) {
- Optional<InstanceActionHandler> handler = action.get().getHandler();
- if (handler.isPresent()) {
- Optional<Amount<Long, Time>> reevaluateDelay = handler.get().getReevaluationDelay(
- instance,
- instructions,
- storeProvider,
- stateManager,
- updateAgentReserver,
- updaterStatus,
- key,
- slaKillController);
- if (reevaluateDelay.isPresent()) {
- executor.schedule(
- getDeferredEvaluator(instance, key),
- reevaluateDelay.get().getValue(),
- reevaluateDelay.get().getUnit().getTimeUnit());
- }
+ LOG.info("Executing side-effects for update of {}: {}",
+ key,
+ result.getSideEffects().entrySet());
+ for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) {
+ IInstanceKey instance = InstanceKeys.from(key.getJob(), entry.getKey());
+
+ Optional<InstanceAction> action = entry.getValue().getAction();
+ if (action.isPresent() && !skipSideEffect(autoPauseAfterCurrentBatch, entry.getValue())) {
+ Optional<InstanceActionHandler> handler = action.get().getHandler();
+ if (handler.isPresent()) {
+ Optional<Amount<Long, Time>> reevaluateDelay = handler.get().getReevaluationDelay(
+ instance,
+ instructions,
+ storeProvider,
+ stateManager,
+ updateAgentReserver,
+ updaterStatus,
+ key,
+ slaKillController);
+ if (reevaluateDelay.isPresent()) {
+ executor.schedule(
+ getDeferredEvaluator(instance, key),
+ reevaluateDelay.get().getValue(),
+ reevaluateDelay.get().getUnit().getTimeUnit());
}
}
}
@@ -869,11 +885,9 @@
Set<Integer> instancesCached = instancesSeen.get(key);
Set<Integer> instancesBeingUpdated = result.getSideEffects().keySet();
- // On the final batch, pause for acknowledgement and remove the cache of instances.
- // This will cause the else branch to get run on resume and the update will finish.
if (result.getStatus() == SUCCEEDED) {
instancesSeen.remove(key);
- return true;
+ return false;
}
// If the update evaluation is dealing with new instances, that signals we are at a barrier
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java b/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java
index aa736a6..1e6dd3a 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/SideEffect.java
@@ -18,6 +18,7 @@
import java.util.Set;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
import org.apache.aurora.scheduler.updater.StateEvaluator.Failure;
@@ -139,6 +140,9 @@
/**
* The instance failed to update and is no longer being monitored.
*/
- FAILED
+ FAILED;
+
+ public static final Set<InstanceUpdateStatus> TERMINAL_STATUSES =
+ ImmutableSet.of(SUCCEEDED, FAILED);
}
}
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
index d15217b..376e1c4 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
@@ -81,7 +81,7 @@
}
/**
- * Get the lastest {@link JobUpdateStatus} for an update.
+ * Get the latest {@link JobUpdateStatus} for an update.
*/
static JobUpdateStatus getJobUpdateStatus(IJobUpdateDetails jobUpdateDetails) {
return Iterables.getLast(jobUpdateDetails.getUpdateEvents()).getStatus();
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 82f9ef4..4bafb44 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -1842,23 +1842,18 @@
updater.start(update, AUDIT);
actions.put(0, INSTANCE_UPDATING).putAll(1, INSTANCE_UPDATING);
assertState(ROLLING_FORWARD, actions.build());
- changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
- clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS));
changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING);
- clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS));
-
- // Instance 1 finished first, but update does not yet proceed until 0 finishes.
- actions.put(1, INSTANCE_UPDATED);
- assertState(ROLLING_FORWARD, actions.build());
+ changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);
+ actions.put(0, INSTANCE_UPDATED).put(1, INSTANCE_UPDATED);
+
// Update should now be paused
assertState(ROLL_FORWARD_PAUSED, actions.build());
// Continue the update
updater.resume(UPDATE_ID, AUDIT);
- actions.put(0, INSTANCE_UPDATED);
actions.put(2, INSTANCE_UPDATING);
assertState(ROLLING_FORWARD, actions.build());
@@ -1866,14 +1861,8 @@
// Instance 2 is updated.
changeState(JOB, 2, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);
-
- // Update should now be paused for a second time
- assertState(ROLL_FORWARD_PAUSED, actions.build());
-
- // Continue the update
- updater.resume(UPDATE_ID, AUDIT);
-
actions.put(2, INSTANCE_UPDATED);
+
assertState(ROLLED_FORWARD, actions.build());
assertJobState(
@@ -1911,11 +1900,12 @@
changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);
+ actions.put(0, INSTANCE_UPDATED);
+
// Update should now be paused after first batch is done.
assertState(ROLL_FORWARD_PAUSED, actions.build());
updater.resume(UPDATE_ID, AUDIT);
- actions.put(0, INSTANCE_UPDATED);
actions.put(1, INSTANCE_UPDATING).put(2, INSTANCE_UPDATING);
assertState(ROLLING_FORWARD, actions.build());
@@ -1926,33 +1916,25 @@
changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);
- // Instance 2 will finish before instance 1
+ actions.put(1, INSTANCE_UPDATED);
actions.put(2, INSTANCE_UPDATED);
// Second autoPause at second barrier
assertState(ROLL_FORWARD_PAUSED, actions.build());
updater.resume(UPDATE_ID, AUDIT);
- actions.put(1, INSTANCE_UPDATED);
actions.put(3, INSTANCE_UPDATING).put(4, INSTANCE_UPDATING).put(5, INSTANCE_UPDATING);
assertState(ROLLING_FORWARD, actions.build());
// Third batch is moving forward.
- // Make instance 4 the instance that waits for final transition to SUCCEED
- changeState(JOB, 5, FINISHED, ASSIGNED, STARTING, RUNNING);
changeState(JOB, 3, FINISHED, ASSIGNED, STARTING, RUNNING);
- clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 3, Time.MILLISECONDS));
changeState(JOB, 4, FINISHED, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 5, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);
- actions.put(3, INSTANCE_UPDATED).put(5, INSTANCE_UPDATED);
+ actions.put(3, INSTANCE_UPDATED).put(4, INSTANCE_UPDATED).put(5, INSTANCE_UPDATED);
- // Third barrier
- assertState(ROLL_FORWARD_PAUSED, actions.build());
- updater.resume(UPDATE_ID, AUDIT);
-
- actions.put(4, INSTANCE_UPDATED);
assertState(ROLLED_FORWARD, actions.build());
assertJobState(
@@ -2001,11 +1983,13 @@
changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);
+ actions.put(2, INSTANCE_UPDATED);
+
// Update should now be paused after first batch is done.
assertState(ROLL_FORWARD_PAUSED, actions.build());
updater.resume(UPDATE_ID, AUDIT);
- actions.put(2, INSTANCE_UPDATED).put(0, INSTANCE_UPDATING).put(1, INSTANCE_UPDATING);
+ actions.put(0, INSTANCE_UPDATING).put(1, INSTANCE_UPDATING);
assertState(ROLLING_FORWARD, actions.build());
// Move on to batch two