blob: 966678ec9bdc0c3aa271f76a979f070aeb8bc161 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.updater;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy;
import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.scheduler.updater.InstanceAction.ADD_TASK;
import static org.apache.aurora.scheduler.updater.InstanceAction.AWAIT_STATE_CHANGE;
import static org.apache.aurora.scheduler.updater.InstanceAction.KILL_TASK;
import static org.apache.aurora.scheduler.updater.InstanceAction.WATCH_TASK;
import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.EvaluationResult;
import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus;
import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatus;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_MIN_RUNNING_MS;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.FAILED_TERMINATED;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
public class OneWayJobUpdaterTest extends EasyMockTest {
private static final Set<Integer> EMPTY = ImmutableSet.of();
private UpdateStrategy<Integer> strategy;
private StateEvaluator<String> instance0;
private StateEvaluator<String> instance1;
private StateEvaluator<String> instance2;
private StateEvaluator<String> instance3;
private Map<Integer, StateEvaluator<String>> allInstances;
private InstanceStateProvider<Integer, String> stateProvider;
private OneWayJobUpdater<Integer, String> jobUpdater;
@Before
public void setUp() {
strategy = createMock(new Clazz<UpdateStrategy<Integer>>() { });
instance0 = createMock(new Clazz<StateEvaluator<String>>() { });
instance1 = createMock(new Clazz<StateEvaluator<String>>() { });
instance2 = createMock(new Clazz<StateEvaluator<String>>() { });
instance3 = createMock(new Clazz<StateEvaluator<String>>() { });
allInstances = ImmutableMap.of(
0, instance0,
1, instance1,
2, instance2,
3, instance3);
stateProvider = createMock(new Clazz<InstanceStateProvider<Integer, String>>() { });
}
private void evaluate(OneWayStatus expectedStatus, Map<Integer, SideEffect> expectedSideEffects) {
assertEquals(
new EvaluationResult<>(expectedStatus, expectedSideEffects),
jobUpdater.evaluate(ImmutableMap.of(), stateProvider));
}
private void evaluate(
int instanceId,
String state,
OneWayStatus expectedStatus,
Map<Integer, SideEffect> expectedSideEffects) {
assertEquals(
new EvaluationResult<>(expectedStatus, expectedSideEffects),
jobUpdater.evaluate(ImmutableMap.of(instanceId, state), stateProvider));
}
private void expectEvaluate(
StateEvaluator<String> instanceMock,
String state,
Result result) {
expect(instanceMock.evaluate(state)).andReturn(result);
}
private void expectFetchAndEvaluate(
int instanceId,
StateEvaluator<String> instanceMock,
String state,
Result result) {
expect(stateProvider.getState(instanceId)).andReturn(state);
expect(instanceMock.evaluate(state)).andReturn(result);
}
private static SideEffect sideEffect(InstanceAction action, InstanceUpdateStatus... statuses) {
return new SideEffect(
Optional.of(action),
ImmutableSet.<InstanceUpdateStatus>builder().add(statuses).build(),
Optional.empty());
}
private static SideEffect sideEffect(InstanceUpdateStatus... statuses) {
return new SideEffect(
Optional.empty(),
ImmutableSet.<InstanceUpdateStatus>builder().add(statuses).build(),
Optional.empty());
}
@Test
public void testSuccessfulUpdate() {
expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
.andReturn(ImmutableSet.of(0, 2));
String s0 = "0";
String s1 = "1";
String s2 = "2";
String s3 = "3";
expectFetchAndEvaluate(
0,
instance0,
s0,
KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
expectFetchAndEvaluate(
2,
instance2,
s2,
REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
expectEvaluate(instance0, s0, EVALUATE_ON_STATE_CHANGE);
expect(strategy.getNextGroup(ImmutableSet.of(1, 3), ImmutableSet.of(0, 2))).andReturn(EMPTY);
expectEvaluate(instance0, s0, SUCCEEDED);
expect(strategy.getNextGroup(ImmutableSet.of(1, 3), ImmutableSet.of(2))).andReturn(EMPTY);
expectEvaluate(instance2, s2, SUCCEEDED);
expect(strategy.getNextGroup(ImmutableSet.of(1, 3), EMPTY))
.andReturn(ImmutableSet.of(1, 3));
expectFetchAndEvaluate(1, instance1, s1, SUCCEEDED);
expectEvaluate(instance3, s3, EVALUATE_AFTER_MIN_RUNNING_MS);
expectFetchAndEvaluate(3, instance3, s3, SUCCEEDED);
control.replay();
jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
evaluate(
OneWayStatus.WORKING,
ImmutableMap.of(
0, sideEffect(KILL_TASK, InstanceUpdateStatus.WORKING),
2, sideEffect(ADD_TASK, InstanceUpdateStatus.WORKING)));
evaluate(
0,
s0,
OneWayStatus.WORKING,
ImmutableMap.of(0, sideEffect(AWAIT_STATE_CHANGE)));
evaluate(
0,
s0,
OneWayStatus.WORKING,
ImmutableMap.of(0, sideEffect(InstanceUpdateStatus.SUCCEEDED)));
evaluate(
2,
s2,
OneWayStatus.WORKING,
ImmutableMap.of(
1, sideEffect(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.SUCCEEDED),
2, sideEffect(InstanceUpdateStatus.SUCCEEDED),
3, sideEffect(WATCH_TASK, InstanceUpdateStatus.WORKING)));
evaluate(
3,
s3,
OneWayStatus.SUCCEEDED,
ImmutableMap.of(3, sideEffect(InstanceUpdateStatus.SUCCEEDED)));
}
@Test
public void testFailedUpdate() {
expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
.andReturn(ImmutableSet.of(0, 1));
String s0 = "0";
String s1 = "1";
expectFetchAndEvaluate(
0,
instance0,
s0,
FAILED_TERMINATED);
expectFetchAndEvaluate(
1,
instance1,
s1,
KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
control.replay();
jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
evaluate(
OneWayStatus.FAILED,
ImmutableMap.of(
0, sideEffect(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.FAILED),
1, sideEffect(KILL_TASK, InstanceUpdateStatus.WORKING)));
// The updater should now reject further attempts to evaluate.
try {
jobUpdater.evaluate(ImmutableMap.of(), stateProvider);
fail();
} catch (IllegalStateException e) {
// Expected.
}
}
@Test
public void testEvaluateCompletedInstance() {
expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
.andReturn(ImmutableSet.of(0));
expect(strategy.getNextGroup(ImmutableSet.of(1, 2, 3), EMPTY))
.andReturn(ImmutableSet.of(1));
expect(strategy.getNextGroup(ImmutableSet.of(2, 3), ImmutableSet.of(1)))
.andReturn(ImmutableSet.of());
String s0 = "0";
String s1 = "1";
expectFetchAndEvaluate(
0,
instance0,
s0,
SUCCEEDED);
expectFetchAndEvaluate(
1,
instance1,
s1,
EVALUATE_ON_STATE_CHANGE);
control.replay();
jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
evaluate(
OneWayStatus.WORKING,
ImmutableMap.of(
0, sideEffect(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.SUCCEEDED),
1, sideEffect(AWAIT_STATE_CHANGE, InstanceUpdateStatus.WORKING)));
// Instance 0 is already considered finished, so any further notifications of its state will
// no-op.
evaluate(
0,
s0,
OneWayStatus.WORKING,
ImmutableMap.of());
}
@Test
public void testResultsObjectOverrides() {
control.replay();
EvaluationResult<String> a = new EvaluationResult<>(
OneWayStatus.WORKING,
ImmutableMap.of("a", sideEffect(KILL_TASK)));
EvaluationResult<String> a2 = new EvaluationResult<>(
OneWayStatus.WORKING,
ImmutableMap.of("a", sideEffect(KILL_TASK)));
EvaluationResult<String> b = new EvaluationResult<>(
OneWayStatus.WORKING,
ImmutableMap.of("b", sideEffect(KILL_TASK)));
EvaluationResult<String> c = new EvaluationResult<>(
OneWayStatus.FAILED,
ImmutableMap.of("b", sideEffect(KILL_TASK)));
assertEquals(ImmutableSet.of(a, b), ImmutableSet.of(a, a2, b));
assertEquals(a, a2);
assertNotEquals(a, b);
assertNotEquals(b, c);
assertNotEquals(a, "");
}
@Test
public void testSideEffectObjectOverrides() {
control.replay();
SideEffect a = sideEffect(KILL_TASK);
SideEffect b = sideEffect(KILL_TASK);
SideEffect c = sideEffect(KILL_TASK, InstanceUpdateStatus.FAILED);
SideEffect d = sideEffect(AWAIT_STATE_CHANGE, InstanceUpdateStatus.FAILED);
assertEquals(a, b);
assertNotEquals(a, c);
assertNotEquals(c, d);
assertNotEquals(a, "a string");
}
}