| /** |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.aurora.scheduler.updater; |
| |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.Optional; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Lists; |
| |
| import org.apache.aurora.common.quantity.Amount; |
| import org.apache.aurora.common.quantity.Time; |
| import org.apache.aurora.common.util.testing.FakeClock; |
| import org.apache.aurora.gen.AssignedTask; |
| import org.apache.aurora.gen.Constraint; |
| import org.apache.aurora.gen.ScheduleStatus; |
| import org.apache.aurora.gen.ScheduledTask; |
| import org.apache.aurora.gen.TaskConfig; |
| import org.apache.aurora.gen.TaskConstraint; |
| import org.apache.aurora.gen.TaskEvent; |
| import org.apache.aurora.gen.ValueConstraint; |
| import org.apache.aurora.scheduler.base.Tasks; |
| import org.apache.aurora.scheduler.storage.entities.IScheduledTask; |
| import org.apache.aurora.scheduler.storage.entities.ITaskConfig; |
| import org.junit.Test; |
| |
| import static org.apache.aurora.gen.Resource.numCpus; |
| import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED; |
| import static org.apache.aurora.gen.ScheduleStatus.FAILED; |
| import static org.apache.aurora.gen.ScheduleStatus.FINISHED; |
| import static org.apache.aurora.gen.ScheduleStatus.KILLING; |
| import static org.apache.aurora.gen.ScheduleStatus.PENDING; |
| import static org.apache.aurora.gen.ScheduleStatus.RUNNING; |
| import static org.apache.aurora.gen.ScheduleStatus.STARTING; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_MIN_RUNNING_MS; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED; |
| import static org.junit.Assert.assertEquals; |
| |
| public class InstanceUpdaterTest { |
| private static final Optional<ITaskConfig> NO_CONFIG = Optional.empty(); |
| |
| private static final ITaskConfig OLD = ITaskConfig.build(new TaskConfig() |
| .setResources(ImmutableSet.of(numCpus(1.0)))); |
| private static final ITaskConfig NEW = ITaskConfig.build(new TaskConfig() |
| .setProduction(true) |
| .setResources(ImmutableSet.of(numCpus(1.0)))); |
| private static final ITaskConfig NEW_EXTRA_RESOURCES = ITaskConfig.build(new TaskConfig() |
| .setResources(ImmutableSet.of(numCpus(2.0)))); |
| private static final ITaskConfig NEW_DIFFERENT_CONSTRAINTS = ITaskConfig.build(new TaskConfig() |
| .setConstraints(ImmutableSet.of(new Constraint("different", |
| TaskConstraint.value(new ValueConstraint(false, ImmutableSet.of("test"))))))); |
| |
| private static final Amount<Long, Time> MIN_RUNNING_TIME = Amount.of(1L, Time.MINUTES); |
| private static final Amount<Long, Time> A_LONG_TIME = Amount.of(1L, Time.DAYS); |
| |
| private static class TestFixture { |
| private final FakeClock clock; |
| private final InstanceUpdater updater; |
| private final TaskUtil taskUtil; |
| private Optional<IScheduledTask> task = Optional.empty(); |
| |
| TestFixture(Optional<ITaskConfig> newConfig, int maxToleratedFailures) { |
| this.clock = new FakeClock(); |
| this.updater = new InstanceUpdater(newConfig, maxToleratedFailures, MIN_RUNNING_TIME, clock); |
| this.taskUtil = new TaskUtil(clock); |
| } |
| |
| TestFixture(ITaskConfig newConfig, int maxToleratedFailures) { |
| this(Optional.of(newConfig), maxToleratedFailures); |
| } |
| |
| void setActualState(ITaskConfig config) { |
| this.task = Optional.of(taskUtil.makeTask(config, PENDING)); |
| } |
| |
| void setActualStateAbsent() { |
| this.task = Optional.empty(); |
| } |
| |
| private Result changeStatusAndEvaluate(ScheduleStatus status) { |
| ScheduledTask builder = task.get().newBuilder(); |
| if (builder.getStatus() != status) { |
| // Only add a task event if this is a state change. |
| builder.addToTaskEvents(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status)); |
| } |
| builder.setStatus(status); |
| |
| task = Optional.of(IScheduledTask.build(builder)); |
| return updater.evaluate(task); |
| } |
| |
| void evaluateCurrentState(Result expectedResult) { |
| assertEquals(expectedResult, updater.evaluate(task)); |
| } |
| |
| void evaluate( |
| Result expectedResult, |
| ScheduleStatus status, |
| ScheduleStatus... statuses) { |
| |
| assertEquals(expectedResult, changeStatusAndEvaluate(status)); |
| for (ScheduleStatus s : statuses) { |
| assertEquals(expectedResult, changeStatusAndEvaluate(s)); |
| } |
| } |
| |
| void advanceTime(Amount<Long, Time> time) { |
| clock.advance(time); |
| } |
| } |
| |
| @Test |
| public void testSuccessfulUpdate() { |
| TestFixture f = new TestFixture(NEW, 1); |
| f.setActualState(OLD); |
| f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING); |
| f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED); |
| f.setActualState(NEW); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING); |
| f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING); |
| f.advanceTime(MIN_RUNNING_TIME); |
| f.evaluateCurrentState(SUCCEEDED); |
| } |
| |
| @Test |
| public void testUpdateWithResourceChange() { |
| TestFixture f = new TestFixture(NEW_EXTRA_RESOURCES, 1); |
| f.setActualState(OLD); |
| f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING); |
| f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED); |
| f.setActualState(NEW_EXTRA_RESOURCES); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING); |
| f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING); |
| f.advanceTime(MIN_RUNNING_TIME); |
| f.evaluateCurrentState(SUCCEEDED); |
| } |
| |
| @Test |
| public void testUpdateWithConstraintChange() { |
| TestFixture f = new TestFixture(NEW_DIFFERENT_CONSTRAINTS, 1); |
| f.setActualState(OLD); |
| f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING); |
| f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED); |
| f.setActualState(NEW_DIFFERENT_CONSTRAINTS); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING); |
| f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING); |
| f.advanceTime(MIN_RUNNING_TIME); |
| f.evaluateCurrentState(SUCCEEDED); |
| } |
| |
| @Test |
| public void testUpdateRetryOnTaskExit() { |
| TestFixture f = new TestFixture(NEW, 1); |
| f.setActualState(OLD); |
| f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING); |
| f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED); |
| f.setActualState(NEW); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING); |
| f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, FAILED); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING); |
| f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING); |
| f.advanceTime(MIN_RUNNING_TIME); |
| f.evaluateCurrentState(SUCCEEDED); |
| } |
| |
| @Test |
| public void testUpdateRetryFailure() { |
| TestFixture f = new TestFixture(NEW, 0); |
| f.setActualState(OLD); |
| f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING); |
| f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED); |
| f.setActualState(NEW); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING); |
| f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING); |
| f.evaluate(Result.FAILED_TERMINATED, FAILED); |
| } |
| |
| @Test |
| public void testNoopUpdate() { |
| TestFixture f = new TestFixture(NEW, 1); |
| f.setActualState(NEW); |
| f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING); |
| f.advanceTime(MIN_RUNNING_TIME); |
| f.evaluate(SUCCEEDED, RUNNING); |
| } |
| |
| @Test |
| public void testPointlessUpdate() { |
| TestFixture f = new TestFixture(NO_CONFIG, 1); |
| f.setActualStateAbsent(); |
| f.evaluateCurrentState(SUCCEEDED); |
| } |
| |
| @Test |
| public void testNoOldConfig() { |
| TestFixture f = new TestFixture(NEW, 1); |
| f.setActualStateAbsent(); |
| f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE); |
| f.setActualState(NEW); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING); |
| f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING); |
| f.advanceTime(MIN_RUNNING_TIME); |
| f.evaluateCurrentState(SUCCEEDED); |
| } |
| |
| @Test |
| public void testNoNewConfig() { |
| TestFixture f = new TestFixture(NO_CONFIG, 1); |
| f.setActualState(OLD); |
| f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING); |
| f.evaluate(SUCCEEDED, FINISHED); |
| f.evaluateCurrentState(SUCCEEDED); |
| } |
| |
| @Test |
| public void testStuckInPending() { |
| TestFixture f = new TestFixture(NEW, 1); |
| f.setActualState(OLD); |
| f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING); |
| f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED); |
| f.setActualState(NEW); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING); |
| f.advanceTime(A_LONG_TIME); |
| f.evaluateCurrentState(EVALUATE_ON_STATE_CHANGE); |
| } |
| |
| @Test |
| public void testSlowToKill() { |
| TestFixture f = new TestFixture(NEW, 1); |
| f.setActualState(OLD); |
| f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING); |
| f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED); |
| f.setActualState(NEW); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING); |
| f.advanceTime(A_LONG_TIME); |
| f.evaluateCurrentState(EVALUATE_ON_STATE_CHANGE); |
| f.setActualStateAbsent(); |
| f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE); |
| f.setActualState(NEW); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, ASSIGNED, STARTING); |
| f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING); |
| f.advanceTime(A_LONG_TIME); |
| f.evaluateCurrentState(EVALUATE_ON_STATE_CHANGE); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testInvalidInput() { |
| TestFixture f = new TestFixture(NEW, 1); |
| ScheduledTask noEvents = new TaskUtil(new FakeClock()) |
| .makeTask(OLD, RUNNING).newBuilder().setTaskEvents(ImmutableList.of()); |
| f.updater.evaluate(Optional.of(IScheduledTask.build(noEvents))); |
| } |
| |
| @Test |
| public void testOldTaskDies() { |
| // If the original task dies, the updater should not add a replacement if the task will be |
| // resuscitated. Only a task that has passed through KILLING will not be resuscitated. |
| |
| TestFixture f = new TestFixture(NEW, 1); |
| f.setActualState(OLD); |
| // Task did not pass through KILLING, therefore will be rescheduled. |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, FINISHED); |
| } |
| |
| @Test |
| public void testOldTaskDiesAndRescheduled() { |
| // Identical to testOldTaskDies, with the follow-through of rescheduling and updating. |
| // If the original task dies, the updater should not add a replacement if the task will be |
| // resuscitated. Only a task that has passed through KILLING will not be resuscitated. |
| |
| TestFixture f = new TestFixture(NEW, 1); |
| f.setActualState(OLD); |
| |
| // Task did not pass through KILLING, therefore will be rescheduled. |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, FINISHED); |
| f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, PENDING); |
| f.setActualStateAbsent(); |
| f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE); |
| f.setActualState(NEW); |
| f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING); |
| f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING); |
| f.advanceTime(MIN_RUNNING_TIME); |
| f.evaluateCurrentState(SUCCEEDED); |
| } |
| |
| static final class TaskUtil { |
| private final FakeClock clock; |
| |
| TaskUtil(FakeClock clock) { |
| this.clock = Objects.requireNonNull(clock); |
| } |
| |
| IScheduledTask makeTask(ITaskConfig config, ScheduleStatus status) { |
| List<TaskEvent> events = Lists.newArrayList(); |
| if (status != PENDING) { |
| events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(PENDING)); |
| } |
| if (Tasks.isTerminated(status) || status == KILLING) { |
| events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(ASSIGNED)); |
| events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(RUNNING)); |
| } |
| |
| events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status)); |
| |
| return IScheduledTask.build( |
| new ScheduledTask() |
| .setStatus(status) |
| .setTaskEvents(ImmutableList.copyOf(events)) |
| .setAssignedTask( |
| new AssignedTask() |
| .setTask(config.newBuilder()))); |
| } |
| } |
| } |