blob: 47c70a3c0dc4c75692cc67a9683dd8112c5b58a3 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.updater.strategy;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An update strategy that will only add more work when the current active group is empty.
* Size of the groups are picked from the supplied list.
* Last element is picked multiple times if necessary.
*
* @param <T> Instance type.
*/
public class VariableBatchStrategy<T extends Comparable<T>> implements UpdateStrategy<T> {
private final Ordering<T> ordering;
protected final ImmutableList<Integer> groupSizes;
private final boolean rollingForward;
private Optional<Integer> totalModInstanceCount;
private static final Logger LOG = LoggerFactory.getLogger(VariableBatchStrategy.class);
/**
* Creates a variable active-limited strategy that applies an upper bound to all results.
*
* @param maxActiveGroups List of Maximum group sizes. Each group size represents a step.
* {@link #getNextGroup(Set, Set)}.
*/
public VariableBatchStrategy(
Ordering<T> ordering,
List<Integer> maxActiveGroups,
boolean rollingForward) {
this.ordering = Objects.requireNonNull(ordering);
this.rollingForward = rollingForward;
maxActiveGroups.forEach(x -> Preconditions.checkArgument(x > 0));
this.groupSizes = ImmutableList.copyOf(maxActiveGroups);
this.totalModInstanceCount = Optional.empty();
}
// Determine how far we're into the update based upon how many instances are waiting
// to be modified.
private int determineCurGroupSize(int remaining) {
// Calculate which groupIndex we are in by finding out how many instances we have left to update
int modified = totalModInstanceCount.get() - remaining;
int finalGroupSize = Iterables.getLast(groupSizes);
LOG.debug("Variable Batch Update progress: {} instances have been modified, "
+ "{} instances remain unmodified, and {} overall instances to be modified.",
modified,
remaining,
totalModInstanceCount.get());
if (rollingForward) {
int sum = 0;
for (Integer groupSize : groupSizes) {
sum += groupSize;
if (sum > modified) {
return groupSize;
}
}
// Return last step when number of instances > sum of all groups
return finalGroupSize;
} else {
// To perform the update in reverse, we use the number of remaining tasks left to update
// instead of using the number of already modified instances. In a rollback, the remaining
// count represents the number of instances that were already modified while rolling forward
// and need to be reverted.
int curGroupSize = remaining;
for (Integer groupSize : groupSizes) {
// This handles an in between step. i.e.: updated instances = 4, update groups = [2,3]
// which results in update groups 2 and 2 rolling forward at the time of failure.
if (curGroupSize <= groupSize) {
return curGroupSize;
}
curGroupSize -= groupSize;
}
// Handle the case where number of instances update were
// greater than the sum of all update groups
// Calculate the size of the last update group size performed while rolling forward.
curGroupSize = curGroupSize % finalGroupSize;
if (curGroupSize == 0) {
return finalGroupSize;
} else {
return curGroupSize;
}
}
}
@Override
public final Set<T> getNextGroup(Set<T> idle, Set<T> active) {
// Get the size for the idle set on the first run only. This is representative of the number
// of overall instance modifications this update will trigger.
if (!totalModInstanceCount.isPresent()) {
totalModInstanceCount = Optional.of(idle.size());
}
// Limit group size to the current size of the group minus the number of instances currently
// being modified.
return ordering.sortedCopy(doGetNextGroup(idle, active)).stream()
.limit(Math.max(0, determineCurGroupSize(idle.size()) - active.size()))
.collect(Collectors.toSet());
}
/**
* Return a list of instances to be updated.
* Returns an empty list if the current active group has not completed.
*
* @param idle Idle instances, candidate for being updated.
* @param active Instances currently being updated.
* @return all idle instances to start updating.
*/
Set<T> doGetNextGroup(Set<T> idle, Set<T> active) {
return active.isEmpty() ? idle : ImmutableSet.of();
}
}