blob: 190d5227ca4173f7641d2863aff179b000d661f8 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.updater;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.Constraint;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.gen.ValueConstraint;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.junit.Test;
import static org.apache.aurora.gen.Resource.numCpus;
import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
import static org.apache.aurora.gen.ScheduleStatus.FAILED;
import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
import static org.apache.aurora.gen.ScheduleStatus.KILLING;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_MIN_RUNNING_MS;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED;
import static org.junit.Assert.assertEquals;
public class InstanceUpdaterTest {
private static final Optional<ITaskConfig> NO_CONFIG = Optional.empty();
private static final ITaskConfig OLD = ITaskConfig.build(new TaskConfig()
.setResources(ImmutableSet.of(numCpus(1.0))));
private static final ITaskConfig NEW = ITaskConfig.build(new TaskConfig()
.setProduction(true)
.setResources(ImmutableSet.of(numCpus(1.0))));
private static final ITaskConfig NEW_EXTRA_RESOURCES = ITaskConfig.build(new TaskConfig()
.setResources(ImmutableSet.of(numCpus(2.0))));
private static final ITaskConfig NEW_DIFFERENT_CONSTRAINTS = ITaskConfig.build(new TaskConfig()
.setConstraints(ImmutableSet.of(new Constraint("different",
TaskConstraint.value(new ValueConstraint(false, ImmutableSet.of("test")))))));
private static final Amount<Long, Time> MIN_RUNNING_TIME = Amount.of(1L, Time.MINUTES);
private static final Amount<Long, Time> A_LONG_TIME = Amount.of(1L, Time.DAYS);
private static class TestFixture {
private final FakeClock clock;
private final InstanceUpdater updater;
private final TaskUtil taskUtil;
private Optional<IScheduledTask> task = Optional.empty();
TestFixture(Optional<ITaskConfig> newConfig, int maxToleratedFailures) {
this.clock = new FakeClock();
this.updater = new InstanceUpdater(newConfig, maxToleratedFailures, MIN_RUNNING_TIME, clock);
this.taskUtil = new TaskUtil(clock);
}
TestFixture(ITaskConfig newConfig, int maxToleratedFailures) {
this(Optional.of(newConfig), maxToleratedFailures);
}
void setActualState(ITaskConfig config) {
this.task = Optional.of(taskUtil.makeTask(config, PENDING));
}
void setActualStateAbsent() {
this.task = Optional.empty();
}
private Result changeStatusAndEvaluate(ScheduleStatus status) {
ScheduledTask builder = task.get().newBuilder();
if (builder.getStatus() != status) {
// Only add a task event if this is a state change.
builder.addToTaskEvents(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status));
}
builder.setStatus(status);
task = Optional.of(IScheduledTask.build(builder));
return updater.evaluate(task);
}
void evaluateCurrentState(Result expectedResult) {
assertEquals(expectedResult, updater.evaluate(task));
}
void evaluate(
Result expectedResult,
ScheduleStatus status,
ScheduleStatus... statuses) {
assertEquals(expectedResult, changeStatusAndEvaluate(status));
for (ScheduleStatus s : statuses) {
assertEquals(expectedResult, changeStatusAndEvaluate(s));
}
}
void advanceTime(Amount<Long, Time> time) {
clock.advance(time);
}
}
@Test
public void testSuccessfulUpdate() {
TestFixture f = new TestFixture(NEW, 1);
f.setActualState(OLD);
f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
f.setActualState(NEW);
f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING);
f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING);
f.advanceTime(MIN_RUNNING_TIME);
f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testUpdateWithResourceChange() {
TestFixture f = new TestFixture(NEW_EXTRA_RESOURCES, 1);
f.setActualState(OLD);
f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
f.setActualState(NEW_EXTRA_RESOURCES);
f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING);
f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING);
f.advanceTime(MIN_RUNNING_TIME);
f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testUpdateWithConstraintChange() {
TestFixture f = new TestFixture(NEW_DIFFERENT_CONSTRAINTS, 1);
f.setActualState(OLD);
f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
f.setActualState(NEW_DIFFERENT_CONSTRAINTS);
f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING);
f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING);
f.advanceTime(MIN_RUNNING_TIME);
f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testUpdateRetryOnTaskExit() {
TestFixture f = new TestFixture(NEW, 1);
f.setActualState(OLD);
f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
f.setActualState(NEW);
f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING);
f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING);
f.evaluate(EVALUATE_ON_STATE_CHANGE, FAILED);
f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING);
f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING);
f.advanceTime(MIN_RUNNING_TIME);
f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testUpdateRetryFailure() {
TestFixture f = new TestFixture(NEW, 0);
f.setActualState(OLD);
f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
f.setActualState(NEW);
f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING);
f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING);
f.evaluate(Result.FAILED_TERMINATED, FAILED);
}
@Test
public void testNoopUpdate() {
TestFixture f = new TestFixture(NEW, 1);
f.setActualState(NEW);
f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING);
f.advanceTime(MIN_RUNNING_TIME);
f.evaluate(SUCCEEDED, RUNNING);
}
@Test
public void testPointlessUpdate() {
TestFixture f = new TestFixture(NO_CONFIG, 1);
f.setActualStateAbsent();
f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testNoOldConfig() {
TestFixture f = new TestFixture(NEW, 1);
f.setActualStateAbsent();
f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
f.setActualState(NEW);
f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING);
f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING);
f.advanceTime(MIN_RUNNING_TIME);
f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testNoNewConfig() {
TestFixture f = new TestFixture(NO_CONFIG, 1);
f.setActualState(OLD);
f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
f.evaluate(SUCCEEDED, FINISHED);
f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testStuckInPending() {
TestFixture f = new TestFixture(NEW, 1);
f.setActualState(OLD);
f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
f.setActualState(NEW);
f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING);
f.advanceTime(A_LONG_TIME);
f.evaluateCurrentState(EVALUATE_ON_STATE_CHANGE);
}
@Test
public void testSlowToKill() {
TestFixture f = new TestFixture(NEW, 1);
f.setActualState(OLD);
f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
f.setActualState(NEW);
f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING);
f.advanceTime(A_LONG_TIME);
f.evaluateCurrentState(EVALUATE_ON_STATE_CHANGE);
f.setActualStateAbsent();
f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
f.setActualState(NEW);
f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING);
f.evaluate(EVALUATE_ON_STATE_CHANGE, ASSIGNED, STARTING);
f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING);
f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
f.advanceTime(A_LONG_TIME);
f.evaluateCurrentState(EVALUATE_ON_STATE_CHANGE);
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidInput() {
TestFixture f = new TestFixture(NEW, 1);
ScheduledTask noEvents = new TaskUtil(new FakeClock())
.makeTask(OLD, RUNNING).newBuilder().setTaskEvents(ImmutableList.of());
f.updater.evaluate(Optional.of(IScheduledTask.build(noEvents)));
}
@Test
public void testOldTaskDies() {
// If the original task dies, the updater should not add a replacement if the task will be
// resuscitated. Only a task that has passed through KILLING will not be resuscitated.
TestFixture f = new TestFixture(NEW, 1);
f.setActualState(OLD);
// Task did not pass through KILLING, therefore will be rescheduled.
f.evaluate(EVALUATE_ON_STATE_CHANGE, FINISHED);
}
@Test
public void testOldTaskDiesAndRescheduled() {
// Identical to testOldTaskDies, with the follow-through of rescheduling and updating.
// If the original task dies, the updater should not add a replacement if the task will be
// resuscitated. Only a task that has passed through KILLING will not be resuscitated.
TestFixture f = new TestFixture(NEW, 1);
f.setActualState(OLD);
// Task did not pass through KILLING, therefore will be rescheduled.
f.evaluate(EVALUATE_ON_STATE_CHANGE, FINISHED);
f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, PENDING);
f.setActualStateAbsent();
f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
f.setActualState(NEW);
f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING);
f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING);
f.advanceTime(MIN_RUNNING_TIME);
f.evaluateCurrentState(SUCCEEDED);
}
static final class TaskUtil {
private final FakeClock clock;
TaskUtil(FakeClock clock) {
this.clock = Objects.requireNonNull(clock);
}
IScheduledTask makeTask(ITaskConfig config, ScheduleStatus status) {
List<TaskEvent> events = Lists.newArrayList();
if (status != PENDING) {
events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(PENDING));
}
if (Tasks.isTerminated(status) || status == KILLING) {
events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(ASSIGNED));
events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(RUNNING));
}
events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status));
return IScheduledTask.build(
new ScheduledTask()
.setStatus(status)
.setTaskEvents(ImmutableList.copyOf(events))
.setAssignedTask(
new AssignedTask()
.setTask(config.newBuilder())));
}
}
}