Staggered (Variable batch) Updates (#37)

Updates can now be carried out in batch sizes of differing sizes. 

Changes:

* Adding an example of an update schema with a variable batch update.

* UI now displays the update strategy used by the update along with the max amount of parallel instances being updated at once, batch size, or variable batch update sizes.

Compatibility with 0.21.0:

* Converting old schema to new schema upon receiving it for compatibility with clients using the older schema to ease transition from 0.21.0 to 0.22.0.

* Back-fill will port old thrift schema to new update strategies.

*  batch size is set from update strategy in order to ensure backwards compatibility in case of rolling back to version 0.21.0.

Tests added:

* Added tests that use variable update to verify that it works as expected both in rolling_forward and rolling_backwards cases and various corner cases.

* Added a fail fast tests when combining update strategies and wait_for_batch_completion or batch size.

Future TODO:

* Added TODO to add verification to the number of instances running on end to end tests during batch updates.

* Remove deprecated Thrift fields once 0.22.0 is released.

diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index e6edbde..4212f11 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -1,3 +1,27 @@
+0.22.0 (unreleased)
+======
+
+### New/updated:
+- New update strategy added: Variable Batch Update. With this strategy, a job may be updated in
+  in batches of different sizes. For example, an update which modifies a total of 10 instances may
+  be done in batch sizes of 2, 3, and 5. The number of updated instances must equal the size of the
+  current group size in order to move to the next group size. If the number of updated instances is
+  greater to the sum of all group sizes, the last group size will be used in perpetuity until all 
+  instances are updated.
+  A new field has been added to `UpdateConfig` called `update_strategy`.
+  Update strategy may take a `QueueUpdateStrategy`, `BatchUpdateStrategy`,
+  or a `VariableBatchUpdateStrategy` object. `QueueUpdateStrategy` and `BatchUpdateStrategy` take
+  a single integer argument while `VariableBatchUpdateStrategy` takes a list of positive integers
+  as an argument.
+  
+### Deprecations and removals:
+
+- Deprecated use of Thrift fields `JobUpdateSettings.waitForBatchCompletion` and
+  `JobUpdateSettings.updateGroupSize`. Please set the proper `JobUpdateSettings.updateStrategy`
+  instead. Note that these same constructs, as represented in the Aurora DSL, are still valid
+  as they will be converted to the new field automatically by the client
+  for backwards compatibility.
+  
 0.21.0
 ======
 
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index dac2267..60bf9b9 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -716,9 +716,36 @@
   2: string id
 }
 
-/** Job update thresholds and limits. */
+/** Limits the amount of active changes being made to instances to groupSize. */
+struct QueueJobUpdateStrategy {
+  1: i32 groupSize
+}
+
+/** Similar to Queue strategy but will not start a new group until all instances in an active
+ * group have finished updating.
+ */
+struct BatchJobUpdateStrategy {
+  1: i32 groupSize
+}
+
+/** Same as Batch strategy but each time an active group completes, the size of the next active
+ * group may change.
+ */
+struct VariableBatchJobUpdateStrategy {
+  1: list<i32> groupSizes
+}
+
+union JobUpdateStrategy {
+ 1: QueueJobUpdateStrategy queueStrategy
+ 2: BatchJobUpdateStrategy batchStrategy
+ 3: VariableBatchJobUpdateStrategy varBatchStrategy
+}
+
+/** Job update thresholds and limits. **/
 struct JobUpdateSettings {
-  /** Max number of instances being updated at any given moment. */
+  /** Deprecated, please set value inside of desired update strategy instead.
+   * Max number of instances being updated at any given moment.
+   */
   1: i32 updateGroupSize
 
   /** Max number of instance failures to tolerate before marking instance as FAILED. */
@@ -736,7 +763,7 @@
   /** Instance IDs to act on. All instances will be affected if this is not set. */
   7: set<Range> updateOnlyTheseInstances
 
-  /**
+  /** Deprecated, please set updateStrategy to the Batch strategy instead.
    * If true, use updateGroupSize as strict batching boundaries, and avoid proceeding to another
    * batch until the preceding batch finishes updating.
    */
@@ -755,6 +782,9 @@
    * differs between the old and new task configurations, updates will use the newest configuration.
    */
   10: optional bool slaAware
+
+  /** Update strategy to be used for the update. See JobUpdateStrategy for choices. */
+  11: optional JobUpdateStrategy updateStrategy
 }
 
 /** Event marking a state transition in job update lifecycle. */
diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md
index 0632559..eb26f56 100644
--- a/docs/reference/configuration.md
+++ b/docs/reference/configuration.md
@@ -376,8 +376,39 @@
 | ```rollback_on_failure```    | boolean  | When False, prevents auto rollback of a failed update (Default: True)
 | ```wait_for_batch_completion```| boolean | When True, all threads from a given batch will be blocked from picking up new instances until the entire batch is updated. This essentially simulates the legacy sequential updater algorithm. (Default: False)
 | ```pulse_interval_secs```    | Integer  |  Indicates a [coordinated update](../features/job-updates.md#coordinated-job-updates). If no pulses are received within the provided interval the update will be blocked. Beta-updater only. Will fail on submission when used with client updater. (Default: None)
+| ```update_strategy```        | Choice of ```QueueUpdateStrategy```,  ```BatchUpdateStrategy```, or ```VariableBatchUpdateStrategy``` object | Indicate which update strategy to use for this update.
 | ```sla_aware```              | boolean  | When True, updates will only update an instance if it does not break the task's specified [SLA Requirements](../features/sla-requirements.md). (Default: None)
 
+### QueueUpdateStrategy Objects
+
+Update strategy which will keep the active updating instances at size ```batch_size``` throughout the update until there are no more instances left to update.
+
+| object                       | type     | description
+| ---------------------------- | :------: | ------------
+| ```batch_size```             | Integer  | Maximum number of shards to be updated in one iteration (Default: 1)
+
+### BatchUpdateStrategy Objects
+
+Update strategy which will wait until a maximum of ``batch_size`` number of instances are updated before continuing on to the next group until all instances are updated.
+
+| object                       | type     | description
+| ---------------------------- | :------: | ------------
+| ```batch_size```             | Integer  | Maximum number of shards to be updated in one iteration (Default: 1)
+
+### VariableBatchUpdateStrategy Objects
+
+Similar to Batch Update strategy, this strategy will wait until all instances in a current group are
+updated before updating more instances. However, instead of maintaining a static group size, the
+size of each group may change as the update progresses. For example, an update which modifies a
+total of 10 instances may be done in batch sizes of 2, 3, and 5. If the number of instances to
+be updated are greater than the sum of the groups, the last group size will be used in
+perpetuity until all instances are updated. Following the previous example, if instead of 10
+instances 20 instances are modified, the update groups would become: 2, 3, 5, 5, 5.
+
+| object                       | type     | description
+| ---------------------------- | :------: | ------------
+| ```batch_sizes```             | List(Integer)  | Maximum number of shards to be updated per iteration. As each iteration completes, the next iteration's group size may change. If there are still instances that need to be updated after all sizes are used, the last size will be reused for the remainder of the update.
+
 #### Using the `sla_aware` option
 
 There are some nuances around the `sla_aware` option that users should be aware of:
diff --git a/examples/jobs/hello_world_variable_update.aurora b/examples/jobs/hello_world_variable_update.aurora
new file mode 100644
index 0000000..4fc42fa
--- /dev/null
+++ b/examples/jobs/hello_world_variable_update.aurora
@@ -0,0 +1,39 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hello = Process(
+  name = 'hello',
+  cmdline = """
+    while true; do
+      echo hello world
+      sleep 10
+    done
+  """)
+
+task = SequentialTask(
+  processes = [hello],
+  resources = Resources(cpu = .10, ram = 8*MB, disk = 8*MB))
+
+jobs = [
+  Service(
+    task = task,
+    cluster = 'devcluster',
+    role = 'www-data',
+    environment = 'prod',
+    name = 'hello',
+    instances = 6,
+    update_config = UpdateConfig(
+        update_strategy = VariableBatchUpdateStrategy(batch_sizes = [1, 2, 3]))
+  )
+]
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
index 41a2f0b..240acdf 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
@@ -20,9 +20,13 @@
 import com.google.inject.Inject;
 
 import org.apache.aurora.GuavaUtils;
+import org.apache.aurora.gen.BatchJobUpdateStrategy;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobUpdate;
 import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStrategy;
+import org.apache.aurora.gen.QueueJobUpdateStrategy;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
@@ -32,6 +36,7 @@
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSettings;
 import org.apache.aurora.scheduler.storage.entities.IResource;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -143,8 +148,27 @@
       backfillTask(instructions.getDesiredState().getTask());
     }
 
+    backfillUpdateStrategy(instructions.getSettings());
+
     instructions.getInitialState().forEach(e -> backfillTask(e.getTask()));
 
     return IJobUpdate.build(update);
   }
+
+  public static void backfillUpdateStrategy(JobUpdateSettings settings) {
+    IJobUpdateSettings updateSettings = IJobUpdateSettings.build(settings);
+
+    // Convert old job update schema to have an update strategy
+    if (!updateSettings.isSetUpdateStrategy()) {
+      if (updateSettings.isWaitForBatchCompletion()) {
+        settings.setUpdateStrategy(
+            JobUpdateStrategy.batchStrategy(
+                new BatchJobUpdateStrategy().setGroupSize(updateSettings.getUpdateGroupSize())));
+      } else {
+        settings.setUpdateStrategy(
+            JobUpdateStrategy.queueStrategy(
+                new QueueJobUpdateStrategy().setGroupSize(updateSettings.getUpdateGroupSize())));
+      }
+    }
+  }
 }
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 74820c9..62a715c 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -67,6 +67,7 @@
 import org.apache.aurora.gen.StartJobUpdateResult;
 import org.apache.aurora.gen.StartMaintenanceResult;
 import org.apache.aurora.gen.TaskQuery;
+import org.apache.aurora.gen.VariableBatchJobUpdateStrategy;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Numbers;
 import org.apache.aurora.scheduler.base.Query;
@@ -792,9 +793,33 @@
       return invalidRequest(NON_SERVICE_TASK);
     }
 
+    int totalInstancesFromGroups;
     JobUpdateSettings settings = requireNonNull(mutableRequest.getSettings());
-    if (settings.getUpdateGroupSize() <= 0) {
-      return invalidRequest(INVALID_GROUP_SIZE);
+
+    // Gracefully handle a client sending an update with an older thrift schema
+    // TODO(rdelvalle): Remove after version 0.22.0 ships
+    ThriftBackfill.backfillUpdateStrategy(settings);
+
+    // Keep old job schema in case we want to revert to a lower version of Aurora that doesn't
+    // support variable update group sizes
+    if (settings.getUpdateStrategy().isSetQueueStrategy()) {
+      totalInstancesFromGroups = settings.getUpdateStrategy().getQueueStrategy().getGroupSize();
+    } else if (settings.getUpdateStrategy().isSetBatchStrategy()) {
+      totalInstancesFromGroups = settings.getUpdateStrategy().getBatchStrategy().getGroupSize();
+    } else if (settings.getUpdateStrategy().isSetVarBatchStrategy()) {
+      VariableBatchJobUpdateStrategy strategy = settings.getUpdateStrategy().getVarBatchStrategy();
+
+      if (strategy.getGroupSizes().stream().anyMatch(x -> x <= 0)) {
+        return invalidRequest(INVALID_GROUP_SIZE);
+      }
+
+      totalInstancesFromGroups = strategy.getGroupSizes().stream().reduce(0, Integer::sum);
+    } else {
+      return invalidRequest(UNKNOWN_UPDATE_STRATEGY);
+    }
+
+    if (totalInstancesFromGroups <= 0) {
+      return invalidRequest(NO_INSTANCES_MODIFIED);
     }
 
     if (settings.getMaxPerInstanceFailures() < 0) {
@@ -1047,10 +1072,13 @@
   static final String NO_CRON = "Cron jobs may only be created/updated by calling scheduleCronJob.";
 
   @VisibleForTesting
+  static final String NO_INSTANCES_MODIFIED = "Update results in no instance being modified.";
+
+  @VisibleForTesting
   static final String NON_SERVICE_TASK = "Updates are not supported for non-service tasks.";
 
   @VisibleForTesting
-  static final String INVALID_GROUP_SIZE = "updateGroupSize must be positive.";
+  static final String INVALID_GROUP_SIZE = "All update group sizes must be positive.";
 
   @VisibleForTesting
   static final String INVALID_MAX_FAILED_INSTANCES = "maxFailedInstances must be non-negative.";
@@ -1077,6 +1105,9 @@
   static final String INVALID_INSTANCE_COUNT = "Instance count must be positive.";
 
   @VisibleForTesting
+  static final String UNKNOWN_UPDATE_STRATEGY = "Update strategy provided is unknown.";
+
+  @VisibleForTesting
   static final String INVALID_SLA_AWARE_UPDATE = "slaAware is true, but no task slaPolicy is "
       + "specified.";
 }
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
index bc8008e..306b89a 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
@@ -37,6 +37,7 @@
 import org.apache.aurora.scheduler.updater.strategy.BatchStrategy;
 import org.apache.aurora.scheduler.updater.strategy.QueueStrategy;
 import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy;
+import org.apache.aurora.scheduler.updater.strategy.VariableBatchStrategy;
 
 import static java.util.Objects.requireNonNull;
 
@@ -76,12 +77,29 @@
     public Update newUpdate(IJobUpdateInstructions instructions, boolean rollingForward) {
       requireNonNull(instructions);
       IJobUpdateSettings settings = instructions.getSettings();
+
       checkArgument(
           settings.getMinWaitInInstanceRunningMs() >= 0,
           "Min wait in running must be non-negative.");
-      checkArgument(
-          settings.getUpdateGroupSize() > 0,
-          "Update group size must be positive.");
+
+      if (settings.getUpdateStrategy().isSetBatchStrategy()) {
+        checkArgument(
+            settings.getUpdateStrategy().getBatchStrategy().getGroupSize() > 0,
+            GROUP_SIZES_INVALID);
+      } else if (settings.getUpdateStrategy().isSetVarBatchStrategy()) {
+        checkArgument(
+            settings.getUpdateStrategy().
+                getVarBatchStrategy().
+                getGroupSizes().
+                stream().
+                reduce(0, Integer::sum) > 0,
+            GROUP_SIZES_INVALID);
+
+      } else {
+        checkArgument(
+            settings.getUpdateStrategy().getQueueStrategy().getGroupSize() > 0,
+            GROUP_SIZES_INVALID);
+      }
 
       Set<Integer> currentInstances = expandInstanceIds(instructions.getInitialState());
       Set<Integer> desiredInstances = instructions.isSetDesiredState()
@@ -116,9 +134,26 @@
           ? updateOrdering
           : updateOrdering.reverse();
 
-      UpdateStrategy<Integer> strategy = settings.isWaitForBatchCompletion()
-          ? new BatchStrategy<>(updateOrder, settings.getUpdateGroupSize())
-          : new QueueStrategy<>(updateOrder, settings.getUpdateGroupSize());
+      UpdateStrategy<Integer> strategy;
+
+      // Note: Verification that the update strategy exists and is valid has already taken
+      // place when the scheduler receives the thrift call.
+      // TODO(rdelvalle): Consider combining Batch Update and Variable Batch update strategies.
+      if (settings.getUpdateStrategy().isSetBatchStrategy()) {
+        strategy = new BatchStrategy<>(
+            updateOrder,
+            settings.getUpdateStrategy().getBatchStrategy().getGroupSize());
+      } else if (settings.getUpdateStrategy().isSetVarBatchStrategy()) {
+        strategy = new VariableBatchStrategy<>(
+            updateOrder,
+            settings.getUpdateStrategy().getVarBatchStrategy().getGroupSizes(),
+            rollingForward);
+      } else {
+        strategy = new QueueStrategy<>(
+            updateOrder,
+            settings.getUpdateStrategy().getQueueStrategy().getGroupSize());
+      }
+
       JobUpdateStatus successStatus =
           rollingForward ? JobUpdateStatus.ROLLED_FORWARD : JobUpdateStatus.ROLLED_BACK;
       JobUpdateStatus failureStatus = rollingForward && settings.isRollbackOnFailure()
@@ -138,6 +173,8 @@
     static Set<Integer> expandInstanceIds(Set<IInstanceTaskConfig> instanceGroups) {
       return Updates.getInstanceIds(instanceGroups).asSet(DiscreteDomain.integers());
     }
+
+    static final String GROUP_SIZES_INVALID = "Update group size(s) must be positive.";
   }
 
   /**
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/strategy/ActiveLimitedStrategy.java b/src/main/java/org/apache/aurora/scheduler/updater/strategy/ActiveLimitedStrategy.java
index 855ea9c..96f4c6c 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/strategy/ActiveLimitedStrategy.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/strategy/ActiveLimitedStrategy.java
@@ -15,9 +15,9 @@
 
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Ordering;
 
 /**
@@ -43,10 +43,9 @@
 
   @Override
   public final Set<T> getNextGroup(Set<T> idle, Set<T> active) {
-    return FluentIterable
-        .from(ordering.sortedCopy(doGetNextGroup(idle, active)))
+    return ordering.sortedCopy(doGetNextGroup(idle, active)).stream()
         .limit(Math.max(0, maxActive - active.size()))
-        .toSet();
+        .collect(Collectors.toSet());
   }
 
   /**
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/strategy/VariableBatchStrategy.java b/src/main/java/org/apache/aurora/scheduler/updater/strategy/VariableBatchStrategy.java
new file mode 100644
index 0000000..47c70a3
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/strategy/VariableBatchStrategy.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater.strategy;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An update strategy that will only add more work when the current active group is empty.
+ * Size of the groups are picked from the supplied list.
+ * Last element is picked multiple times if necessary.
+ *
+ * @param <T> Instance type.
+ */
+public class VariableBatchStrategy<T extends Comparable<T>> implements UpdateStrategy<T> {
+  private final Ordering<T> ordering;
+  protected final ImmutableList<Integer> groupSizes;
+  private final boolean rollingForward;
+  private Optional<Integer> totalModInstanceCount;
+
+  private static final Logger LOG = LoggerFactory.getLogger(VariableBatchStrategy.class);
+
+  /**
+   * Creates a variable active-limited strategy that applies an upper bound to all results.
+   *
+   * @param maxActiveGroups  List of Maximum group sizes. Each group size represents a step.
+   * {@link #getNextGroup(Set, Set)}.
+   */
+  public VariableBatchStrategy(
+      Ordering<T> ordering,
+      List<Integer> maxActiveGroups,
+      boolean rollingForward) {
+
+    this.ordering = Objects.requireNonNull(ordering);
+    this.rollingForward = rollingForward;
+
+    maxActiveGroups.forEach(x -> Preconditions.checkArgument(x > 0));
+
+    this.groupSizes = ImmutableList.copyOf(maxActiveGroups);
+    this.totalModInstanceCount = Optional.empty();
+  }
+
+  // Determine how far we're into the update based upon how many instances are waiting
+  // to be modified.
+  private int determineCurGroupSize(int remaining) {
+    // Calculate which groupIndex we are in by finding out how many instances we have left to update
+    int modified = totalModInstanceCount.get() - remaining;
+    int finalGroupSize = Iterables.getLast(groupSizes);
+
+    LOG.debug("Variable Batch Update progress: {} instances have been modified, "
+            + "{} instances remain unmodified, and {} overall instances to be modified.",
+        modified,
+        remaining,
+        totalModInstanceCount.get());
+
+    if (rollingForward) {
+      int sum = 0;
+      for (Integer groupSize : groupSizes) {
+        sum += groupSize;
+
+        if (sum > modified) {
+          return groupSize;
+        }
+      }
+      // Return last step when number of instances > sum of all groups
+      return finalGroupSize;
+    } else {
+      // To perform the update in reverse, we use the number of remaining tasks left to update
+      // instead of using the number of already modified instances. In a rollback, the remaining
+      // count represents the number of instances that were already modified while rolling forward
+      // and need to be reverted.
+      int curGroupSize = remaining;
+
+      for (Integer groupSize : groupSizes) {
+        // This handles an in between step. i.e.: updated instances = 4, update groups = [2,3]
+        // which results in update groups 2 and 2 rolling forward at the time of failure.
+        if (curGroupSize <= groupSize) {
+          return curGroupSize;
+        }
+
+        curGroupSize -= groupSize;
+      }
+
+      // Handle the case where number of instances update were
+      // greater than the sum of all update groups
+      // Calculate the size of the last update group size performed while rolling forward.
+      curGroupSize = curGroupSize % finalGroupSize;
+      if (curGroupSize == 0) {
+        return finalGroupSize;
+      } else {
+        return curGroupSize;
+      }
+    }
+  }
+
+  @Override
+  public final Set<T> getNextGroup(Set<T> idle, Set<T> active) {
+    // Get the size for the idle set on the first run only. This is representative of the number
+    // of overall instance modifications this update will trigger.
+    if (!totalModInstanceCount.isPresent()) {
+      totalModInstanceCount = Optional.of(idle.size());
+    }
+
+    // Limit group size to the current size of the group minus the number of instances currently
+    // being modified.
+    return ordering.sortedCopy(doGetNextGroup(idle, active)).stream()
+            .limit(Math.max(0, determineCurGroupSize(idle.size()) - active.size()))
+            .collect(Collectors.toSet());
+  }
+
+  /**
+   * Return a list of instances to be updated.
+   * Returns an empty list if the current active group has not completed.
+   *
+   * @param idle Idle instances, candidate for being updated.
+   * @param active Instances currently being updated.
+   * @return all idle instances to start updating.
+   */
+  Set<T> doGetNextGroup(Set<T> idle, Set<T> active) {
+    return active.isEmpty() ? idle : ImmutableSet.of();
+  }
+}
diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py
index 34822bc..147313d 100644
--- a/src/main/python/apache/aurora/client/api/__init__.py
+++ b/src/main/python/apache/aurora/client/api/__init__.py
@@ -148,7 +148,7 @@
 
   def _job_update_request(self, config, instances=None, metadata=None):
     try:
-      settings = UpdaterConfig(**config.update_config().get()).to_thrift_update_settings(instances)
+      settings = UpdaterConfig(config.update_config()).to_thrift_update_settings(instances)
     except ValueError as e:
       raise self.UpdateConfigError(str(e))
 
diff --git a/src/main/python/apache/aurora/client/api/updater_util.py b/src/main/python/apache/aurora/client/api/updater_util.py
index 5c2d953..9b17c96 100644
--- a/src/main/python/apache/aurora/client/api/updater_util.py
+++ b/src/main/python/apache/aurora/client/api/updater_util.py
@@ -16,40 +16,52 @@
 from itertools import groupby
 from operator import itemgetter
 
+from pystachio import Empty, Choice
 from twitter.common import log
 
 from gen.apache.aurora.api.ttypes import JobUpdateSettings, Range
+from apache.aurora.config.schema.base import (
+ BatchUpdateStrategy as PystachioBatchUpdateStrategy,
+ QueueUpdateStrategy as PystachioQueueUpdateStrategy,
+ VariableBatchUpdateStrategy as PystachioVariableBatchUpdateStrategy
+)
+from apache.aurora.config.thrift import create_update_strategy_config
 
 
 class UpdaterConfig(object):
   MIN_PULSE_INTERVAL_SECONDS = 60
 
-  def __init__(self,
-               batch_size,
-               watch_secs,
-               max_per_shard_failures,
-               max_total_failures,
-               rollback_on_failure=True,
-               wait_for_batch_completion=False,
-               pulse_interval_secs=None,
-               sla_aware=None):
+  def __init__(self, config):
+    self.batch_size = config.batch_size().get()
+    self.watch_secs = config.watch_secs().get()
+    self.max_total_failures = config.max_total_failures().get()
+    self.max_per_instance_failures = config.max_per_shard_failures().get()
+    self.update_strategy = config.update_strategy()
+    self.sla_aware = config.sla_aware().get()
+    self.wait_for_batch_completion = config.wait_for_batch_completion().get()
+    self.rollback_on_failure = config.rollback_on_failure().get()
+    self.pulse_interval_secs = None
 
-    if batch_size <= 0:
+    # Override default values if they are provided.
+    if config.pulse_interval_secs() is not Empty:
+      self.pulse_interval_secs = config.pulse_interval_secs().get()
+
+    if self.batch_size <= 0:
       raise ValueError('Batch size should be greater than 0')
-    if watch_secs < 0:
+    if self.watch_secs < 0:
       raise ValueError('Watch seconds should be greater than or equal to 0')
-    if pulse_interval_secs is not None and pulse_interval_secs < self.MIN_PULSE_INTERVAL_SECONDS:
+    if (self.pulse_interval_secs is not None and
+            self.pulse_interval_secs < self.MIN_PULSE_INTERVAL_SECONDS):
       raise ValueError('Pulse interval seconds must be at least %s seconds.'
-                       % self.MIN_PULSE_INTERVAL_SECONDS)
-
-    self.batch_size = batch_size
-    self.watch_secs = watch_secs
-    self.max_total_failures = max_total_failures
-    self.max_per_instance_failures = max_per_shard_failures
-    self.rollback_on_failure = rollback_on_failure
-    self.wait_for_batch_completion = wait_for_batch_completion
-    self.pulse_interval_secs = pulse_interval_secs
-    self.sla_aware = sla_aware
+                      % self.MIN_PULSE_INTERVAL_SECONDS)
+    if self.wait_for_batch_completion and self.update_strategy is not Empty:
+      raise ValueError('Ambiguous update configuration. Cannot combine '
+                       'wait_batch_completion with an '
+                       'explicit update strategy.')
+    if self.batch_size > 1 and self.update_strategy is not Empty:
+      raise ValueError('Ambiguous update configuration. Cannot combine '
+                       'update strategy with batch size. Please set batch'
+                       'size inside of update strategy instead.')
 
   @classmethod
   def instances_to_ranges(cls, instances):
@@ -80,6 +92,25 @@
     Arguments:
     instances - optional list of instances to update.
     """
+
+    if self.update_strategy is Empty:
+      update_strategy = Choice([PystachioQueueUpdateStrategy,
+                                PystachioBatchUpdateStrategy,
+                                PystachioVariableBatchUpdateStrategy])
+      if self.wait_for_batch_completion:
+        self.update_strategy = update_strategy(
+          PystachioBatchUpdateStrategy(batch_size=self.batch_size))
+      else:
+        self.update_strategy = update_strategy(
+          PystachioQueueUpdateStrategy(batch_size=self.batch_size))
+    else:
+      unwrapped = self.update_strategy.unwrap()
+      if (isinstance(unwrapped, PystachioQueueUpdateStrategy) or
+          isinstance(unwrapped, PystachioBatchUpdateStrategy)):
+        self.batch_size = self.update_strategy.groupSize
+      elif isinstance(unwrapped, PystachioBatchUpdateStrategy):
+        self.batch_size = self.update_strategy.groupSizes[0]
+
     return JobUpdateSettings(
         updateGroupSize=self.batch_size,
         maxPerInstanceFailures=self.max_per_instance_failures,
@@ -88,6 +119,7 @@
         rollbackOnFailure=self.rollback_on_failure,
         waitForBatchCompletion=self.wait_for_batch_completion,
         updateOnlyTheseInstances=self.instances_to_ranges(instances) if instances else None,
+        updateStrategy=create_update_strategy_config(self.update_strategy),
         blockIfNoPulsesAfterMs=(self.pulse_interval_secs * 1000 if self.pulse_interval_secs
             else None),
         slaAware=self.sla_aware
diff --git a/src/main/python/apache/aurora/config/schema/base.py b/src/main/python/apache/aurora/config/schema/base.py
index bf75660..5e35d93 100644
--- a/src/main/python/apache/aurora/config/schema/base.py
+++ b/src/main/python/apache/aurora/config/schema/base.py
@@ -28,6 +28,14 @@
   instance    = Required(Integer)
   hostname    = Required(String)
 
+class QueueUpdateStrategy(Struct):
+  batch_size = Default(Integer, 1)
+
+class BatchUpdateStrategy(Struct):
+  batch_size = Default(Integer, 1)
+
+class VariableBatchUpdateStrategy(Struct):
+  batch_sizes = Required(List(Integer))
 
 class UpdateConfig(Struct):
   batch_size                  = Default(Integer, 1)
@@ -38,6 +46,9 @@
   wait_for_batch_completion   = Default(Boolean, False)
   pulse_interval_secs         = Integer
   sla_aware                   = Default(Boolean, False)
+  update_strategy             = Choice([QueueUpdateStrategy,
+                                        BatchUpdateStrategy,
+                                        VariableBatchUpdateStrategy])
 
 
 class HttpHealthChecker(Struct):
diff --git a/src/main/python/apache/aurora/config/thrift.py b/src/main/python/apache/aurora/config/thrift.py
index 2ffa5b6..c9632f2 100644
--- a/src/main/python/apache/aurora/config/thrift.py
+++ b/src/main/python/apache/aurora/config/thrift.py
@@ -18,12 +18,17 @@
 from pystachio import Empty, Ref
 from twitter.common.lang import Compatibility
 
-from apache.aurora.config.schema.base import AppcImage as PystachioAppcImage
-from apache.aurora.config.schema.base import Container as PystachioContainer
-from apache.aurora.config.schema.base import CoordinatorSlaPolicy as PystachioCoordinatorSlaPolicy
-from apache.aurora.config.schema.base import CountSlaPolicy as PystachioCountSlaPolicy
-from apache.aurora.config.schema.base import DockerImage as PystachioDockerImage
-from apache.aurora.config.schema.base import PercentageSlaPolicy as PystachioPercentageSlaPolicy
+from apache.aurora.config.schema.base import(
+  AppcImage as PystachioAppcImage,
+  BatchUpdateStrategy as PystachioBatchUpdateStrategy,
+  Container as PystachioContainer,
+  CoordinatorSlaPolicy as PystachioCoordinatorSlaPolicy,
+  CountSlaPolicy as PystachioCountSlaPolicy,
+  DockerImage as PystachioDockerImage,
+  PercentageSlaPolicy as PystachioPercentageSlaPolicy,
+  QueueUpdateStrategy as PystachioQueueUpdateStrategy,
+  VariableBatchUpdateStrategy as PystachioVariableBatchUpdateStrategy
+)
 from apache.aurora.config.schema.base import (
     Docker,
     HealthCheckConfig,
@@ -36,6 +41,7 @@
 from gen.apache.aurora.api.constants import AURORA_EXECUTOR_NAME, GOOD_IDENTIFIER_PATTERN_PYTHON
 from gen.apache.aurora.api.ttypes import (
     AppcImage,
+    BatchJobUpdateStrategy,
     Constraint,
     Container,
     CoordinatorSlaPolicy,
@@ -49,6 +55,7 @@
     Image,
     JobConfiguration,
     JobKey,
+    JobUpdateStrategy,
     LimitConstraint,
     MesosContainer,
     Metadata,
@@ -56,9 +63,11 @@
     PartitionPolicy,
     PercentageSlaPolicy,
     Resource,
+    QueueJobUpdateStrategy,
     SlaPolicy,
     TaskConfig,
     TaskConstraint,
+    VariableBatchJobUpdateStrategy,
     ValueConstraint,
     Volume
 )
@@ -181,6 +190,37 @@
   raise InvalidConfig('If a container is specified it must set one type.')
 
 
+def create_update_strategy_config(update_strategy):
+  unwrapped = update_strategy.unwrap()
+  if unwrapped is Empty:
+    return JobUpdateStrategy(
+        queueStrategy=QueueJobUpdateStrategy(
+            groupSize=1),
+        batchStrategy=None,
+        varBatchStrategy=None)
+
+  if isinstance(unwrapped, PystachioQueueUpdateStrategy):
+    return JobUpdateStrategy(
+        queueStrategy=QueueJobUpdateStrategy(
+            groupSize=fully_interpolated(unwrapped.batch_size())),
+        batchStrategy=None,
+        varBatchStrategy=None)
+
+  if isinstance(unwrapped, PystachioBatchUpdateStrategy):
+    return JobUpdateStrategy(
+        queueStrategy=None,
+        batchStrategy=BatchJobUpdateStrategy(
+            groupSize=fully_interpolated(unwrapped.batch_size())),
+        varBatchStrategy=None)
+
+  if isinstance(unwrapped, PystachioVariableBatchUpdateStrategy):
+    return JobUpdateStrategy(
+        queueStrategy=None,
+        batchStrategy=None,
+        varBatchStrategy=VariableBatchJobUpdateStrategy(
+            groupSizes=fully_interpolated(unwrapped.batch_sizes())))
+
+
 def volumes_to_thrift(volumes):
   thrift_volumes = []
   for v in volumes:
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractJobUpdateStoreTest.java
index 3a93650..9354781 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractJobUpdateStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractJobUpdateStoreTest.java
@@ -25,6 +25,7 @@
 import com.google.inject.Injector;
 import com.google.inject.Module;
 
+import org.apache.aurora.gen.BatchJobUpdateStrategy;
 import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobInstanceUpdateEvent;
 import org.apache.aurora.gen.JobUpdate;
@@ -37,6 +38,7 @@
 import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateState;
 import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateStrategy;
 import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.Metadata;
 import org.apache.aurora.gen.Range;
@@ -629,7 +631,9 @@
                 .setTask(config)))
         .setSettings(new JobUpdateSettings()
             .setBlockIfNoPulsesAfterMs(500)
-            .setUpdateGroupSize(1)
+            .setUpdateStrategy(
+                JobUpdateStrategy.batchStrategy(new BatchJobUpdateStrategy().setGroupSize(1)))
+            .setUpdateGroupSize(1) // TODO(rdelvalle): Remove when thrift field deprecated.
             .setMaxPerInstanceFailures(1)
             .setMaxFailedInstances(1)
             .setMinWaitInInstanceRunningMs(200)
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/DataCompatibilityTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/DataCompatibilityTest.java
index 3372cec..f06704c 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/durability/DataCompatibilityTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/DataCompatibilityTest.java
@@ -304,7 +304,7 @@
     Map<String, ValueDifference<String>> modified = difference.entriesDiffering();
     if (!modified.isEmpty()) {
       error.append("Schema changes to Op(s): " + modified.keySet())
-          .append("\nThis check detects that changes occured, not how the schema changed.")
+          .append("\nThis check detects that changes occurred, not how the schema changed.")
           .append("\nSome guidelines for evolving schemas:")
           .append("\n  * Introducing fields: you must handle reading records that do not")
           .append("\n    yet have the field set.  This can be done with a backfill routine during")
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
index ddb9d06..44c4b48 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
@@ -40,9 +40,11 @@
 import org.apache.aurora.gen.JobUpdateKey;
 import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateStrategy;
 import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.PercentageSlaPolicy;
+import org.apache.aurora.gen.QueueJobUpdateStrategy;
 import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.ScheduleStatus;
@@ -256,11 +258,16 @@
         .setInstructions(new JobUpdateInstructions()
             .setInitialState(
                 ImmutableSet.of(new InstanceTaskConfig().setTask(nonBackfilledConfig())))
-            .setDesiredState(new InstanceTaskConfig().setTask(nonBackfilledConfig())));
+            .setDesiredState(new InstanceTaskConfig().setTask(nonBackfilledConfig()))
+            .setSettings(new JobUpdateSettings()));
     JobUpdate expectedUpdate = actualUpdate.deepCopy();
     expectedUpdate.getInstructions().getDesiredState().setTask(makeConfig(JOB_KEY).newBuilder());
     expectedUpdate.getInstructions().getInitialState()
         .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder()));
+    expectedUpdate.getInstructions()
+        .getSettings()
+        .setUpdateStrategy(
+            JobUpdateStrategy.queueStrategy(new QueueJobUpdateStrategy().setGroupSize(0)));
     SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate);
     builder.add(Edit.op(Op.saveJobUpdate(saveUpdate)));
     storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate));
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotterImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotterImplIT.java
index 4c1918f..a8f43ce 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotterImplIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotterImplIT.java
@@ -23,6 +23,7 @@
 import org.apache.aurora.common.util.testing.FakeBuildInfo;
 import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.BatchJobUpdateStrategy;
 import org.apache.aurora.gen.CronCollisionPolicy;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.HostMaintenanceRequest;
@@ -40,6 +41,7 @@
 import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateState;
 import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateStrategy;
 import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.PercentageSlaPolicy;
@@ -143,12 +145,12 @@
                       .setTask(TASK_CONFIG.newBuilder())))
               .setSettings(new JobUpdateSettings()
                   .setBlockIfNoPulsesAfterMs(500)
-                  .setUpdateGroupSize(1)
+                  .setUpdateStrategy(
+                      JobUpdateStrategy.batchStrategy(new BatchJobUpdateStrategy().setGroupSize(1)))
                   .setMaxPerInstanceFailures(1)
                   .setMaxFailedInstances(1)
                   .setMinWaitInInstanceRunningMs(200)
                   .setRollbackOnFailure(true)
-                  .setWaitForBatchCompletion(true)
                   .setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(0, 0)))))
           .setSummary(new JobUpdateSummary()
               .setState(new JobUpdateState().setStatus(JobUpdateStatus.ERROR))
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 334fd5d..c4d27aa 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -45,6 +45,7 @@
 import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateRequest;
 import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStrategy;
 import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.LimitConstraint;
 import org.apache.aurora.gen.ListBackupsResult;
@@ -54,6 +55,7 @@
 import org.apache.aurora.gen.PercentageSlaPolicy;
 import org.apache.aurora.gen.PulseJobUpdateResult;
 import org.apache.aurora.gen.QueryRecoveryResult;
+import org.apache.aurora.gen.QueueJobUpdateStrategy;
 import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ReadOnlyScheduler;
 import org.apache.aurora.gen.Resource;
@@ -70,6 +72,7 @@
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.TaskQuery;
 import org.apache.aurora.gen.ValueConstraint;
+import org.apache.aurora.gen.VariableBatchJobUpdateStrategy;
 import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
@@ -1475,15 +1478,46 @@
     control.replay();
 
     JobUpdateRequest updateRequest = buildServiceJobUpdateRequest();
-    updateRequest.getSettings().setUpdateGroupSize(0);
+    updateRequest.getSettings().setUpdateStrategy(
+        JobUpdateStrategy.queueStrategy(new QueueJobUpdateStrategy().setGroupSize(0)));
 
     assertEquals(
-        invalidResponse(SchedulerThriftInterface.INVALID_GROUP_SIZE),
+        invalidResponse(SchedulerThriftInterface.NO_INSTANCES_MODIFIED),
         thrift.startJobUpdate(updateRequest, AUDIT_MESSAGE));
     assertEquals(0L, statsProvider.getLongValue(START_JOB_UPDATE));
   }
 
   @Test
+  public void testStartUpdateFailsInvalidGroupSizeVariableBatch() throws Exception {
+    control.replay();
+
+    JobUpdateRequest updateRequest = buildServiceJobUpdateRequest();
+    updateRequest.getSettings().setUpdateStrategy(
+            JobUpdateStrategy.varBatchStrategy(
+                    new VariableBatchJobUpdateStrategy().setGroupSizes(ImmutableList.of(1, 0, 4))));
+
+    assertEquals(
+            invalidResponse(SchedulerThriftInterface.INVALID_GROUP_SIZE),
+            thrift.startJobUpdate(updateRequest, AUDIT_MESSAGE));
+    assertEquals(0L, statsProvider.getLongValue(START_JOB_UPDATE));
+  }
+
+  @Test
+  public void testStartUpdateFailsInvalidGroupsSum() throws Exception {
+    control.replay();
+
+    JobUpdateRequest updateRequest = buildServiceJobUpdateRequest();
+    updateRequest.getSettings().setUpdateStrategy(
+            JobUpdateStrategy.varBatchStrategy(
+                    new VariableBatchJobUpdateStrategy().setGroupSizes(ImmutableList.of())));
+
+    assertEquals(
+            invalidResponse(SchedulerThriftInterface.NO_INSTANCES_MODIFIED),
+            thrift.startJobUpdate(updateRequest, AUDIT_MESSAGE));
+    assertEquals(0L, statsProvider.getLongValue(START_JOB_UPDATE));
+  }
+
+  @Test
   public void testStartUpdateFailsInvalidMaxInstanceFailures() throws Exception {
     control.replay();
 
@@ -1962,6 +1996,8 @@
   private static JobUpdateSettings buildJobUpdateSettings() {
     return new JobUpdateSettings()
         .setUpdateGroupSize(10)
+        .setUpdateStrategy(
+            JobUpdateStrategy.queueStrategy(new QueueJobUpdateStrategy().setGroupSize(10)))
         .setMaxFailedInstances(2)
         .setMaxPerInstanceFailures(1)
         .setMinWaitInInstanceRunningMs(15000)
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index c96def1..e8febc4 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -45,6 +45,7 @@
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.common.util.TruncatedBinaryBackoff;
+import org.apache.aurora.gen.BatchJobUpdateStrategy;
 import org.apache.aurora.gen.CountSlaPolicy;
 import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobUpdate;
@@ -56,8 +57,10 @@
 import org.apache.aurora.gen.JobUpdateSettings;
 import org.apache.aurora.gen.JobUpdateState;
 import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateStrategy;
 import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.Metadata;
+import org.apache.aurora.gen.QueueJobUpdateStrategy;
 import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
@@ -806,8 +809,8 @@
 
     JobUpdate builder = makeJobUpdate(makeInstanceConfig(0, 2, OLD_CONFIG)).newBuilder();
     builder.getInstructions().getSettings()
-        .setWaitForBatchCompletion(true)
-        .setUpdateGroupSize(2);
+        .setUpdateStrategy(
+            JobUpdateStrategy.batchStrategy(new BatchJobUpdateStrategy().setGroupSize(2)));
     IJobUpdate update = IJobUpdate.build(builder);
     insertInitialTasks(update);
 
@@ -1105,7 +1108,10 @@
     control.replay();
 
     JobUpdate update = makeJobUpdate().newBuilder();
-    update.getInstructions().getSettings().setUpdateGroupSize(-1);
+    update.getInstructions()
+        .getSettings()
+        .setUpdateStrategy(
+            JobUpdateStrategy.queueStrategy(new QueueJobUpdateStrategy().setGroupSize(-1)));
     expectInvalid(update);
 
     update = makeJobUpdate().newBuilder();
@@ -1121,7 +1127,8 @@
 
     control.replay();
 
-    IJobUpdate update = setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 1, OLD_CONFIG)), 2);
+    IJobUpdate update = setInstanceCount(
+        makeJobUpdate(makeInstanceConfig(0, 1, OLD_CONFIG)), 2);
     insertInitialTasks(update);
 
     changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
@@ -1140,7 +1147,11 @@
       store.deleteAllUpdates();
 
       JobUpdate builder = update.newBuilder();
-      builder.getInstructions().getSettings().setUpdateGroupSize(0);
+      builder.getInstructions()
+          .getSettings()
+          .getUpdateStrategy()
+          .getQueueStrategy()
+          .setGroupSize(0);
       saveJobUpdate(store, IJobUpdate.build(builder), ROLLING_FORWARD);
     });
 
@@ -1351,8 +1362,8 @@
 
     JobUpdate builder = makeJobUpdate(makeInstanceConfig(0, 2, OLD_CONFIG)).newBuilder();
     builder.getInstructions().getSettings()
-        .setWaitForBatchCompletion(true)
-        .setUpdateGroupSize(2);
+        .setUpdateStrategy(
+            JobUpdateStrategy.batchStrategy(new BatchJobUpdateStrategy().setGroupSize(2)));
     IJobUpdate update = IJobUpdate.build(builder);
     insertInitialTasks(update);
 
@@ -1405,8 +1416,8 @@
 
     JobUpdate builder = makeJobUpdate(makeInstanceConfig(0, 2, OLD_CONFIG)).newBuilder();
     builder.getInstructions().getSettings()
-        .setWaitForBatchCompletion(true)
-        .setUpdateGroupSize(2);
+        .setUpdateStrategy(
+            JobUpdateStrategy.batchStrategy(new BatchJobUpdateStrategy().setGroupSize(2)));
     IJobUpdate update = IJobUpdate.build(builder);
     insertInitialTasks(update);
 
@@ -1831,7 +1842,9 @@
                 .setTask(newConfig.newBuilder())
                 .setInstances(ImmutableSet.of(new Range(0, 2))))
             .setSettings(new JobUpdateSettings()
-                .setUpdateGroupSize(updateGroupSize)
+                .setUpdateStrategy(
+                    JobUpdateStrategy.queueStrategy(
+                        new QueueJobUpdateStrategy().setGroupSize(updateGroupSize)))
                 .setRollbackOnFailure(true)
                 .setMinWaitInInstanceRunningMs(WATCH_TIMEOUT.as(Time.MILLISECONDS).intValue())
                 .setUpdateOnlyTheseInstances(ImmutableSet.of())));
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/UpdateFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/updater/UpdateFactoryImplTest.java
index 01bdcf1..b195fb1 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/UpdateFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/UpdateFactoryImplTest.java
@@ -22,6 +22,8 @@
 import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobUpdateInstructions;
 import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStrategy;
+import org.apache.aurora.gen.QueueJobUpdateStrategy;
 import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
@@ -46,7 +48,8 @@
               .setMaxFailedInstances(1)
               .setMaxPerInstanceFailures(1)
               .setMinWaitInInstanceRunningMs(100)
-              .setUpdateGroupSize(2)
+              .setUpdateStrategy(
+                  JobUpdateStrategy.queueStrategy(new QueueJobUpdateStrategy().setGroupSize(2)))
               .setUpdateOnlyTheseInstances(ImmutableSet.of())));
 
   private UpdateFactory factory;
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/strategy/VariableBatchStrategyTest.java b/src/test/java/org/apache/aurora/scheduler/updater/strategy/VariableBatchStrategyTest.java
new file mode 100644
index 0000000..8126b99
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/strategy/VariableBatchStrategyTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater.strategy;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+
+import org.junit.Test;
+
+import static com.google.common.collect.ImmutableSet.of;
+
+import static org.junit.Assert.assertEquals;
+
+public class VariableBatchStrategyTest {
+
+  private static final Ordering<Integer> ORDERING = Ordering.natural();
+  private static final Set<Integer> EMPTY = of();
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadParameter() {
+    new VariableBatchStrategy<>(ORDERING, ImmutableList.of(0), true);
+  }
+
+  @Test
+  public void testNoWorkToDo() {
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING,
+        ImmutableList.of(2),
+        true);
+    assertEquals(EMPTY, strategy.getNextGroup(EMPTY, of(0, 1)));
+    assertEquals(EMPTY, strategy.getNextGroup(EMPTY, EMPTY));
+  }
+
+  @Test
+  public void testVariableBatchCompletion() {
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING,
+        ImmutableList.of(2),
+        true);
+    assertEquals(EMPTY, strategy.getNextGroup(of(2, 3), of(0, 1)));
+    assertEquals(EMPTY, strategy.getNextGroup(of(2, 3), of(1)));
+    assertEquals(of(2, 3), strategy.getNextGroup(of(2, 3), EMPTY));
+  }
+
+  @Test
+  public void testBatchesIgnoreInstanceValues() {
+    // Batches are defined as groups of instances, not partitioned based on the instance ID values.
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING,
+        ImmutableList.of(2),
+        true);
+    assertEquals(of(0, 1), strategy.getNextGroup(of(0, 1, 2, 3), EMPTY));
+    assertEquals(of(1, 2), strategy.getNextGroup(of(1, 2, 3), EMPTY));
+    assertEquals(of(2, 3), strategy.getNextGroup(of(2, 3), EMPTY));
+    assertEquals(of(3, 8), strategy.getNextGroup(of(3, 8), EMPTY));
+  }
+
+  @Test
+  public void testExhausted() {
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING,
+        ImmutableList.of(3),
+        true);
+    assertEquals(of(0, 1, 2), strategy.getNextGroup(of(0, 1, 2), EMPTY));
+    assertEquals(of(0, 1), strategy.getNextGroup(of(0, 1), EMPTY));
+    assertEquals(of(1), strategy.getNextGroup(of(1), EMPTY));
+  }
+
+  @Test
+  public void testActiveTooLarge() {
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING,
+        ImmutableList.of(2),
+        true);
+    assertEquals(EMPTY, strategy.getNextGroup(of(0, 1, 2), of(3, 4, 5)));
+  }
+
+  @Test
+  public void testIncreasingGroupSizes() {
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING,
+        ImmutableList.of(1, 2, 3),
+        true);
+    assertEquals(of(0), strategy.getNextGroup(of(0, 1, 2, 3, 4, 5), EMPTY));
+    assertEquals(of(1, 2), strategy.getNextGroup(of(1, 2, 3, 4, 5), EMPTY));
+    assertEquals(of(3, 4, 5), strategy.getNextGroup(of(3, 4, 5), EMPTY));
+  }
+
+  @Test
+  public void testDecreasingGroupSizes() {
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING,
+        ImmutableList.of(3, 2, 1),
+        true);
+    assertEquals(of(0, 1, 2), strategy.getNextGroup(of(0, 1, 2, 3, 4, 5), EMPTY));
+    assertEquals(of(3, 4), strategy.getNextGroup(of(3, 4, 5), EMPTY));
+    assertEquals(of(5), strategy.getNextGroup(of(5), EMPTY));
+  }
+
+  @Test
+  public void testSeeSawGroupSizes() {
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING,
+        ImmutableList.of(1, 3, 2, 4),
+        true);
+    assertEquals(of(0), strategy.getNextGroup(of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), EMPTY));
+    assertEquals(of(1, 2, 3), strategy.getNextGroup(of(1, 2, 3, 4, 5, 6, 7, 8, 9), EMPTY));
+    assertEquals(of(4, 5), strategy.getNextGroup(of(4, 5, 6, 7, 8, 9), EMPTY));
+    assertEquals(of(6, 7, 8, 9), strategy.getNextGroup(of(6, 7, 8, 9), EMPTY));
+  }
+
+  @Test
+  public void testMoreInstancesThanSumOfGroupSizes() {
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING,
+        ImmutableList.of(1, 2),
+        true);
+    assertEquals(of(0), strategy.getNextGroup(of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), EMPTY));
+    assertEquals(of(1, 2), strategy.getNextGroup(of(1, 2, 3, 4, 5, 6, 7, 8, 9), EMPTY));
+    assertEquals(of(3, 4), strategy.getNextGroup(of(3, 4, 5, 6, 7, 8, 9), EMPTY));
+    assertEquals(of(5, 6), strategy.getNextGroup(of(5, 6, 7, 8, 9), EMPTY));
+    assertEquals(of(7, 8), strategy.getNextGroup(of(7, 8, 9), EMPTY));
+    assertEquals(of(9), strategy.getNextGroup(of(9), EMPTY));
+  }
+
+  @Test
+  public void testRollback() {
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING.reverse(),
+        ImmutableList.of(1, 2, 3),
+        false);
+    assertEquals(of(3, 4, 5), strategy.getNextGroup(of(0, 1, 2, 3, 4, 5), EMPTY));
+    assertEquals(of(1, 2), strategy.getNextGroup(of(0, 1, 2), EMPTY));
+    assertEquals(of(0), strategy.getNextGroup(of(0), EMPTY));
+  }
+
+  @Test
+  public void testRollbackMidWay() {
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING.reverse(),
+        ImmutableList.of(1, 2, 3),
+        false);
+    assertEquals(of(1, 2), strategy.getNextGroup(of(0, 1, 2), EMPTY));
+    assertEquals(of(0), strategy.getNextGroup(of(0), EMPTY));
+  }
+
+  @Test
+  public void testRollbackOverflow() {
+    UpdateStrategy<Integer> strategy = new VariableBatchStrategy<>(ORDERING.reverse(),
+        ImmutableList.of(1, 2),
+        false);
+    assertEquals(of(5, 6), strategy.getNextGroup(of(0, 1, 2, 3, 4, 5, 6), EMPTY));
+    assertEquals(of(4, 3), strategy.getNextGroup(of(0, 1, 2, 3, 4), EMPTY));
+    assertEquals(of(2, 1), strategy.getNextGroup(of(0, 1, 2), EMPTY));
+    assertEquals(of(0), strategy.getNextGroup(of(0), EMPTY));
+  }
+
+}
diff --git a/src/test/python/apache/aurora/client/api/test_api.py b/src/test/python/apache/aurora/client/api/test_api.py
index 9d6d9de..3e3a1a5 100644
--- a/src/test/python/apache/aurora/client/api/test_api.py
+++ b/src/test/python/apache/aurora/client/api/test_api.py
@@ -32,6 +32,8 @@
     JobUpdateRequest,
     JobUpdateSettings,
     JobUpdateStatus,
+    JobUpdateStrategy,
+    QueueJobUpdateStrategy,
     Resource,
     Response,
     ResponseCode,
@@ -83,11 +85,16 @@
   def create_update_settings(cls):
     return JobUpdateSettings(
         updateGroupSize=1,
+        updateStrategy=JobUpdateStrategy(
+          queueStrategy=QueueJobUpdateStrategy(groupSize=1),
+          batchStrategy=None,
+          varBatchStrategy=None),
         maxPerInstanceFailures=2,
         maxFailedInstances=1,
         minWaitInInstanceRunningMs=50 * 1000,
         rollbackOnFailure=True,
-        waitForBatchCompletion=False)
+        waitForBatchCompletion=False,
+        slaAware=False)
 
   @classmethod
   def create_update_request(cls, task_config):
@@ -99,12 +106,14 @@
   @classmethod
   def mock_job_config(cls, error=None):
     config = create_autospec(spec=AuroraConfig, instance=True)
-    mock_get = create_autospec(spec=UpdateConfig, instance=True)
-    mock_get.get.return_value = cls.UPDATE_CONFIG
+    update_config = UpdateConfig(batch_size=1,
+                                 watch_secs=50,
+                                 max_per_shard_failures=2,
+                                 max_total_failures=1)
     if error:
       config.update_config.side_effect = error
     else:
-      config.update_config.return_value = mock_get
+      config.update_config.return_value = update_config
     mock_task_config = create_autospec(spec=JobConfiguration, instance=True)
     mock_task_config.taskConfig = TaskConfig()
     config.job.return_value = mock_task_config
diff --git a/src/test/python/apache/aurora/client/api/test_updater_util.py b/src/test/python/apache/aurora/client/api/test_updater_util.py
index 48926c3..bb16f11 100644
--- a/src/test/python/apache/aurora/client/api/test_updater_util.py
+++ b/src/test/python/apache/aurora/client/api/test_updater_util.py
@@ -12,16 +12,45 @@
 # limitations under the License.
 #
 import unittest
+import copy
 
 from pytest import raises
+from pystachio import Choice
 
+from gen.apache.aurora.api.ttypes import (
+  JobUpdateSettings,
+  JobUpdateStrategy,
+  VariableBatchJobUpdateStrategy,
+  BatchJobUpdateStrategy,
+  QueueJobUpdateStrategy
+)
 from apache.aurora.client.api import UpdaterConfig
+from apache.aurora.config.schema.base import UpdateConfig
+from apache.aurora.config.schema.base import (
+  BatchUpdateStrategy as PystachioBatchUpdateStrategy,
+  QueueUpdateStrategy as PystachioQueueUpdateStrategy,
+  VariableBatchUpdateStrategy as PystachioVariableBatchUpdateStrategy
+)
 
 from gen.apache.aurora.api.ttypes import Range
 
 
-class TestRangeConversion(unittest.TestCase):
-  """Job instance ID to range conversion."""
+class TestUpdaterUtil(unittest.TestCase):
+
+  EXPECTED_JOB_UPDATE_SETTINGS = JobUpdateSettings(
+    blockIfNoPulsesAfterMs=None,
+    updateOnlyTheseInstances=None,
+    slaAware=False,
+    maxPerInstanceFailures=0,
+    waitForBatchCompletion=False,
+    rollbackOnFailure=True,
+    minWaitInInstanceRunningMs=45000,
+    updateGroupSize=1,
+    maxFailedInstances=0)
+
+  UPDATE_STRATEGIES = Choice([PystachioQueueUpdateStrategy,
+                            PystachioBatchUpdateStrategy,
+                            PystachioVariableBatchUpdateStrategy])
 
   def test_multiple_ranges(self):
     """Test multiple ranges."""
@@ -46,15 +75,122 @@
     assert UpdaterConfig.instances_to_ranges([]) is None, "Result must be None."
 
   def test_pulse_interval_secs(self):
-    config = UpdaterConfig(1, 1, 1, 1, pulse_interval_secs=60)
+    config = UpdaterConfig(
+      UpdateConfig(batch_size=1,
+                   watch_secs=1,
+                   max_per_shard_failures=1,
+                   max_total_failures=1,
+                   pulse_interval_secs=60))
     assert 60000 == config.to_thrift_update_settings().blockIfNoPulsesAfterMs
 
   def test_pulse_interval_unset(self):
-    config = UpdaterConfig(1, 1, 1, 1)
+    config = UpdaterConfig(
+      UpdateConfig(batch_size=1, watch_secs=1, max_per_shard_failures=1, max_total_failures=1))
     assert config.to_thrift_update_settings().blockIfNoPulsesAfterMs is None
 
   def test_pulse_interval_too_low(self):
     threshold = UpdaterConfig.MIN_PULSE_INTERVAL_SECONDS
     with raises(ValueError) as e:
-      UpdaterConfig(1, 1, 1, 1, pulse_interval_secs=threshold - 1)
+      UpdaterConfig(UpdateConfig(batch_size=1,
+                                 watch_secs=1,
+                                 max_per_shard_failures=1,
+                                 max_total_failures=1,
+                                 pulse_interval_secs=threshold - 1))
     assert 'Pulse interval seconds must be at least %s seconds.' % threshold in e.value.message
+
+  def test_to_thrift_update_settings_strategy(self):
+
+    """Test to_thrift produces an expected thrift update settings configuration
+       from a Pystachio update object.
+    """
+
+    config = UpdaterConfig(
+      UpdateConfig(
+        update_strategy=self.UPDATE_STRATEGIES(
+          PystachioVariableBatchUpdateStrategy(batch_sizes=[1, 2, 3, 4]))))
+
+    thrift_update_config = config.to_thrift_update_settings()
+
+    update_settings = copy.deepcopy(self.EXPECTED_JOB_UPDATE_SETTINGS)
+
+    update_settings.updateStrategy = JobUpdateStrategy(
+      batchStrategy=None,
+      queueStrategy=None,
+      varBatchStrategy=VariableBatchJobUpdateStrategy(groupSizes=(1, 2, 3, 4)))
+
+    assert thrift_update_config == update_settings
+
+  def test_to_thrift_update_settings_no_strategy_queue(self):
+
+    """Test to_thrift produces an expected thrift update settings configuration
+       from a Pystachio update object that doesn't include an update strategy.
+
+       The configuration in this test should be converted to a
+       QueueJobUpdateStrategy.
+    """
+
+    config = UpdaterConfig(UpdateConfig())
+
+    thrift_update_config = config.to_thrift_update_settings()
+
+    update_settings = copy.deepcopy(self.EXPECTED_JOB_UPDATE_SETTINGS)
+    update_settings.updateStrategy = JobUpdateStrategy(
+        batchStrategy=None,
+        queueStrategy=QueueJobUpdateStrategy(groupSize=1),
+        varBatchStrategy=None)
+
+    assert thrift_update_config == update_settings
+
+  def test_to_thrift_update_settings_no_strategy_batch(self):
+
+    """Test to_thrift produces an expected thrift update settings configuration
+       from a Pystachio update object that doesn't include an update strategy.
+
+       The configuration in this test should be converted to a
+       BatchJobUpdateStrategy.
+    """
+
+    config = UpdaterConfig(UpdateConfig(wait_for_batch_completion=True))
+
+    thrift_update_config = config.to_thrift_update_settings()
+
+    update_settings = copy.deepcopy(self.EXPECTED_JOB_UPDATE_SETTINGS)
+    update_settings.updateStrategy = JobUpdateStrategy(
+        batchStrategy=BatchJobUpdateStrategy(groupSize=1),
+        queueStrategy=None,
+        varBatchStrategy=None)
+    update_settings.waitForBatchCompletion = True
+
+    assert thrift_update_config == update_settings
+
+  def test_wait_for_batch_completion_and_update_strategy(self):
+
+    """Test setting wait_for_batch_completion along with an update strategy.
+       This combination should result in a fast fail.
+    """
+
+    with raises(ValueError) as e:
+      UpdaterConfig(UpdateConfig(wait_for_batch_completion=True,
+                                 update_strategy=self.UPDATE_STRATEGIES(
+                                     PystachioBatchUpdateStrategy(
+                                         batch_size=3))))
+
+    assert ('Ambiguous update configuration. Cannot combine '
+            'wait_batch_completion with an '
+            'explicit update strategy.' in e.value.message)
+
+  def test_batch_size_and_update_strategy(self):
+
+    """Test setting a batch size along with an update strategy.
+       This combination should result in a fast fail.
+    """
+
+    with raises(ValueError) as e:
+      UpdaterConfig(UpdateConfig(batch_size=2,
+                                 update_strategy=self.UPDATE_STRATEGIES(
+                                     PystachioBatchUpdateStrategy(
+                                         batch_size=3))))
+
+    assert ('Ambiguous update configuration. Cannot combine '
+            'update strategy with batch size. Please set batch'
+            'size inside of update strategy instead.' in e.value.message)
diff --git a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
index 3876767..1d1123c 100644
--- a/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
+++ b/src/test/resources/org/apache/aurora/scheduler/storage/durability/goldens/current/saveJobUpdate
@@ -488,6 +488,17 @@
                   },
                   "10": {
                     "tf": 1
+                  },
+                  "11": {
+                    "rec": {
+                      "2": {
+                        "rec": {
+                          "1": {
+                            "i32": 2
+                          }
+                        }
+                      }
+                    }
                   }
                 }
               }
diff --git a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora
index bd4d4a1..877dbfd 100644
--- a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora
+++ b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora
@@ -105,6 +105,7 @@
 
 update_config = UpdateConfig(watch_secs=0, batch_size=2)
 update_config_watch_secs = UpdateConfig(watch_secs=10, batch_size=2)
+update_config_var_batch = UpdateConfig(update_strategy = VariableBatchUpdateStrategy(batch_sizes = [1,2]))
 health_check_config = HealthCheckConfig(initial_interval_secs=5, interval_secs=1)
 shell_health_check_config = HealthCheckConfig(
   health_checker = HealthCheckerConfig(
@@ -145,6 +146,10 @@
     update_config = update_config_watch_secs
   ).bind(profile=DefaultProfile()),
   job(
+    name = 'http_example_var_batch_update',
+    update_config = update_config_var_batch
+  ).bind(profile=DefaultProfile()),
+  job(
     name = 'http_example_revocable',
     tier = 'revocable'
   ).bind(profile=DefaultProfile()),
diff --git a/src/test/sh/org/apache/aurora/e2e/http/http_example_bad_healthcheck.aurora b/src/test/sh/org/apache/aurora/e2e/http/http_example_bad_healthcheck.aurora
index c826a54..b9bf6f8 100644
--- a/src/test/sh/org/apache/aurora/e2e/http/http_example_bad_healthcheck.aurora
+++ b/src/test/sh/org/apache/aurora/e2e/http/http_example_bad_healthcheck.aurora
@@ -52,6 +52,7 @@
 
 update_config = UpdateConfig(watch_secs=0, batch_size=2)
 update_config_watch_secs = UpdateConfig(watch_secs=10, batch_size=2)
+update_config_var_batch = UpdateConfig(update_strategy = VariableBatchUpdateStrategy(batch_sizes = [1,2]))
 # "I am going to fail" config.
 shell_config = ShellHealthChecker(
   # This shell validates two things:
@@ -87,6 +88,10 @@
     update_config = update_config_watch_secs
   ).bind(profile=DefaultProfile()),
   job(
+    name = 'http_example_var_batch_update',
+    update_config = update_config_watch_secs
+  ).bind(profile=DefaultProfile()),
+  job(
     name = 'http_example_revocable',
     tier = 'revocable'
   ).bind(profile=DefaultProfile()),
diff --git a/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora b/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora
index b85907a..c886350 100644
--- a/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora
+++ b/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora
@@ -49,6 +49,7 @@
 
 update_config = UpdateConfig(watch_secs=0, batch_size=3)
 update_config_watch_secs = UpdateConfig(watch_secs=10, batch_size=3)
+update_config_var_batch = UpdateConfig(update_strategy = VariableBatchUpdateStrategy(batch_sizes = [1,2]))
 health_check_config = HealthCheckConfig(initial_interval_secs=5, interval_secs=2, min_consecutive_successes=15)
 shell_health_check_config = HealthCheckConfig(
   health_checker = HealthCheckerConfig(
@@ -79,6 +80,11 @@
     update_config = update_config_watch_secs
   ).bind(profile=DefaultProfile()),
   job(
+    name = 'http_example_var_batch_update',
+    update_config = update_config_var_batch,
+    task = test_task
+  ).bind(profile=DefaultProfile()),
+  job(
     name = 'http_example_revocable',
     tier = 'revocable'
   ).bind(profile=DefaultProfile()),
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
index 288590d..b417280 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
@@ -364,7 +364,6 @@
   # Check that the update ended in ROLLED_FORWARD state.  Assumes the status is the last column.
   assert_update_state_by_id $_jobkey $_update_id 'ROLLED_FORWARD'
 }
-
 test_update_fail() {
   local _jobkey=$1 _config=$2 _cluster=$3  _bad_healthcheck_config=$4
   shift 4
@@ -882,6 +881,7 @@
 TEST_JOB=http_example
 TEST_MAINTENANCE_JOB=http_example_maintenance
 TEST_JOB_WATCH_SECS=http_example_watch_secs
+TEST_JOB_VAR_BATCH_UPDATE=http_example_var_batch_update
 TEST_JOB_REVOCABLE=http_example_revocable
 TEST_JOB_GPU=http_example_gpu
 TEST_JOB_DOCKER=http_example_docker
@@ -918,6 +918,8 @@
 
 TEST_JOB_WATCH_SECS_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB_WATCH_SECS")
 
+TEST_JOB_VAR_BATCH_UPDATE_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB_VAR_BATCH_UPDATE")
+
 TEST_JOB_REVOCABLE_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB_REVOCABLE")
 
 TEST_JOB_GPU_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB_GPU")
@@ -959,6 +961,7 @@
   $TEST_JOB_COORDINATOR_SLA
 )
 
+
 TEST_JOB_KILL_MESSAGE_ARGS=("${TEST_JOB_ARGS[@]}" "--message='Test message'")
 
 trap collect_result EXIT
@@ -974,6 +977,8 @@
 test_version
 test_http_example "${TEST_JOB_ARGS[@]}"
 test_http_example "${TEST_JOB_WATCH_SECS_ARGS[@]}"
+# TODO(rdelvalle): Add verification that each batch has the right number of active instances.
+test_http_example "${TEST_JOB_VAR_BATCH_UPDATE_ARGS[@]}"
 test_health_check
 
 test_mesos_maintenance "${TEST_MAINTENANCE_JOB_ARGS[@]}"
@@ -986,6 +991,7 @@
 
 test_http_example "${TEST_JOB_DOCKER_ARGS[@]}"
 
+
 setup_image_stores
 test_appc_unified
 test_docker_unified
diff --git a/ui/src/main/js/components/UpdateSettings.js b/ui/src/main/js/components/UpdateSettings.js
index d7fbe00..13e98e9 100644
--- a/ui/src/main/js/components/UpdateSettings.js
+++ b/ui/src/main/js/components/UpdateSettings.js
@@ -1,34 +1,77 @@
 import moment from 'moment';
 import React from 'react';
 
+import { isNully } from '../utils/Common';
+
 export default function UpdateSettings({ update }) {
   const settings = update.update.instructions.settings;
+
   return (<div>
     <table className='update-settings'>
-      <tr>
-        <td>Batch Size</td>
-        <td>{settings.updateGroupSize}</td>
-      </tr>
-      <tr>
-        <td>Max Failures Per Instance</td>
-        <td>{settings.maxPerInstanceFailures}</td>
-      </tr>
-      <tr>
-        <td>Max Failed Instances</td>
-        <td>{settings.maxFailedInstances}</td>
-      </tr>
-      <tr>
-        <td>Minimum Waiting Time in Running</td>
-        <td>{moment.duration(settings.minWaitInInstanceRunningMs).humanize()}</td>
-      </tr>
-      <tr>
-        <td>Rollback On Failure?</td>
-        <td>{settings.rollbackOnFailure ? 'yes' : 'no'}</td>
-      </tr>
-      <tr>
-        <td>SLA-Aware?</td>
-        <td>{settings.slaAware ? 'yes' : 'no'}</td>
-      </tr>
+      <tbody>
+        <UpdateStrategy strategy={settings.updateStrategy} />
+        <tr>
+          <td>Max Failures Per Instance</td>
+          <td>{settings.maxPerInstanceFailures}</td>
+        </tr>
+        <tr>
+          <td>Max Failed Instances</td>
+          <td>{settings.maxFailedInstances}</td>
+        </tr>
+        <tr>
+          <td>Minimum Waiting Time in Running</td>
+          <td>{moment.duration(settings.minWaitInInstanceRunningMs).humanize()}</td>
+        </tr>
+        <tr>
+          <td>Rollback On Failure?</td>
+          <td>{settings.rollbackOnFailure ? 'yes' : 'no'}</td>
+        </tr>
+        <tr>
+          <td>SLA-Aware?</td>
+          <td>{settings.slaAware ? 'yes' : 'no'}</td>
+        </tr>
+      </tbody>
     </table>
   </div>);
 }
+
+// ESLint doesn't like React's new adjacent elements, so we need to disable it here
+/* eslint-disable */
+function UpdateStrategy({ strategy }) {
+  if (isNully(strategy)) {
+    return null;
+  }
+
+  if (!isNully(strategy.queueStrategy)) {
+    return [<tr>
+      <td>Strategy</td>
+      <td>Queue</td>
+    </tr>,
+    <tr>
+      <td>Max Parallel Updates</td>
+      <td>{ strategy.queueStrategy.groupSize }</td>
+    </tr>];
+  } else if (!isNully(strategy.batchStrategy)) {
+    return [<tr>
+      <td>Strategy</td>
+      <td>Batch</td>
+    </tr>,
+    <tr>
+      <td>Batch Size</td>
+      <td>{ strategy.batchStrategy.groupSize }</td>
+    </tr>];
+  } else if (!isNully(strategy.varBatchStrategy)) {
+    return [<tr>
+      <td>Strategy</td>
+      <td>Variable Batch</td>
+    </tr>,
+    <tr>
+      <td>Batch Sizes</td>
+      <td>{ strategy.varBatchStrategy.groupSizes.toString() }</td>
+    </tr>];
+  }
+
+  return null;
+}
+
+/* eslint-enable */
diff --git a/ui/src/main/js/test-utils/UpdateBuilders.js b/ui/src/main/js/test-utils/UpdateBuilders.js
index f8ab4d8..ad294af 100644
--- a/ui/src/main/js/test-utils/UpdateBuilders.js
+++ b/ui/src/main/js/test-utils/UpdateBuilders.js
@@ -18,6 +18,16 @@
   USER
 };
 
+export const VarBatchUpdateStrategyBuilder = createBuilder({
+  groupSizes: [1, 2, 3]
+});
+
+export const UpdateStrategyBuilder = createBuilder({
+  batchStrategy: null,
+  queueStrategy: null,
+  varBatchStrategy: VarBatchUpdateStrategyBuilder.build()
+});
+
 export const UpdateSettingsBuilder = createBuilder({
   updateGroupSize: 1,
   maxPerInstanceFailures: 0,
@@ -25,7 +35,8 @@
   minWaitInInstanceRunningMs: 1,
   rollbackOnFailure: true,
   updateOnlyTheseInstances: [],
-  waitForBatchCompletion: false
+  waitForBatchCompletion: false,
+  updateStrategy: UpdateStrategyBuilder.build()
 });
 
 export const UpdateEventBuilder = createBuilder({