| /** |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.aurora.scheduler.updater; |
| |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| |
| import org.apache.aurora.common.testing.easymock.EasyMockTest; |
| import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import static org.apache.aurora.scheduler.updater.InstanceAction.ADD_TASK; |
| import static org.apache.aurora.scheduler.updater.InstanceAction.AWAIT_STATE_CHANGE; |
| import static org.apache.aurora.scheduler.updater.InstanceAction.KILL_TASK; |
| import static org.apache.aurora.scheduler.updater.InstanceAction.WATCH_TASK; |
| import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.EvaluationResult; |
| import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus; |
| import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatus; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_MIN_RUNNING_MS; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.FAILED_TERMINATED; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE; |
| import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED; |
| import static org.easymock.EasyMock.expect; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.fail; |
| |
| public class OneWayJobUpdaterTest extends EasyMockTest { |
| private static final Set<Integer> EMPTY = ImmutableSet.of(); |
| |
| private UpdateStrategy<Integer> strategy; |
| private StateEvaluator<String> instance0; |
| private StateEvaluator<String> instance1; |
| private StateEvaluator<String> instance2; |
| private StateEvaluator<String> instance3; |
| private Map<Integer, StateEvaluator<String>> allInstances; |
| private InstanceStateProvider<Integer, String> stateProvider; |
| |
| private OneWayJobUpdater<Integer, String> jobUpdater; |
| |
| @Before |
| public void setUp() { |
| strategy = createMock(new Clazz<UpdateStrategy<Integer>>() { }); |
| instance0 = createMock(new Clazz<StateEvaluator<String>>() { }); |
| instance1 = createMock(new Clazz<StateEvaluator<String>>() { }); |
| instance2 = createMock(new Clazz<StateEvaluator<String>>() { }); |
| instance3 = createMock(new Clazz<StateEvaluator<String>>() { }); |
| allInstances = ImmutableMap.of( |
| 0, instance0, |
| 1, instance1, |
| 2, instance2, |
| 3, instance3); |
| stateProvider = createMock(new Clazz<InstanceStateProvider<Integer, String>>() { }); |
| } |
| |
| private void evaluate(OneWayStatus expectedStatus, Map<Integer, SideEffect> expectedSideEffects) { |
| assertEquals( |
| new EvaluationResult<>(expectedStatus, expectedSideEffects), |
| jobUpdater.evaluate(ImmutableMap.of(), stateProvider)); |
| } |
| |
| private void evaluate( |
| int instanceId, |
| String state, |
| OneWayStatus expectedStatus, |
| Map<Integer, SideEffect> expectedSideEffects) { |
| |
| assertEquals( |
| new EvaluationResult<>(expectedStatus, expectedSideEffects), |
| jobUpdater.evaluate(ImmutableMap.of(instanceId, state), stateProvider)); |
| } |
| |
| private void expectEvaluate( |
| StateEvaluator<String> instanceMock, |
| String state, |
| Result result) { |
| |
| expect(instanceMock.evaluate(state)).andReturn(result); |
| } |
| |
| private void expectFetchAndEvaluate( |
| int instanceId, |
| StateEvaluator<String> instanceMock, |
| String state, |
| Result result) { |
| |
| expect(stateProvider.getState(instanceId)).andReturn(state); |
| expect(instanceMock.evaluate(state)).andReturn(result); |
| } |
| |
| private static SideEffect sideEffect(InstanceAction action, InstanceUpdateStatus... statuses) { |
| return new SideEffect( |
| Optional.of(action), |
| ImmutableSet.<InstanceUpdateStatus>builder().add(statuses).build(), |
| Optional.empty()); |
| } |
| |
| private static SideEffect sideEffect(InstanceUpdateStatus... statuses) { |
| return new SideEffect( |
| Optional.empty(), |
| ImmutableSet.<InstanceUpdateStatus>builder().add(statuses).build(), |
| Optional.empty()); |
| } |
| |
| @Test |
| public void testSuccessfulUpdate() { |
| expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY)) |
| .andReturn(ImmutableSet.of(0, 2)); |
| String s0 = "0"; |
| String s1 = "1"; |
| String s2 = "2"; |
| String s3 = "3"; |
| expectFetchAndEvaluate( |
| 0, |
| instance0, |
| s0, |
| KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE); |
| expectFetchAndEvaluate( |
| 2, |
| instance2, |
| s2, |
| REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE); |
| |
| expectEvaluate(instance0, s0, EVALUATE_ON_STATE_CHANGE); |
| expect(strategy.getNextGroup(ImmutableSet.of(1, 3), ImmutableSet.of(0, 2))).andReturn(EMPTY); |
| expectEvaluate(instance0, s0, SUCCEEDED); |
| expect(strategy.getNextGroup(ImmutableSet.of(1, 3), ImmutableSet.of(2))).andReturn(EMPTY); |
| expectEvaluate(instance2, s2, SUCCEEDED); |
| expect(strategy.getNextGroup(ImmutableSet.of(1, 3), EMPTY)) |
| .andReturn(ImmutableSet.of(1, 3)); |
| expectFetchAndEvaluate(1, instance1, s1, SUCCEEDED); |
| expectEvaluate(instance3, s3, EVALUATE_AFTER_MIN_RUNNING_MS); |
| expectFetchAndEvaluate(3, instance3, s3, SUCCEEDED); |
| |
| control.replay(); |
| |
| jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances); |
| |
| evaluate( |
| OneWayStatus.WORKING, |
| ImmutableMap.of( |
| 0, sideEffect(KILL_TASK, InstanceUpdateStatus.WORKING), |
| 2, sideEffect(ADD_TASK, InstanceUpdateStatus.WORKING))); |
| evaluate( |
| 0, |
| s0, |
| OneWayStatus.WORKING, |
| ImmutableMap.of(0, sideEffect(AWAIT_STATE_CHANGE))); |
| evaluate( |
| 0, |
| s0, |
| OneWayStatus.WORKING, |
| ImmutableMap.of(0, sideEffect(InstanceUpdateStatus.SUCCEEDED))); |
| evaluate( |
| 2, |
| s2, |
| OneWayStatus.WORKING, |
| ImmutableMap.of( |
| 1, sideEffect(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.SUCCEEDED), |
| 2, sideEffect(InstanceUpdateStatus.SUCCEEDED), |
| 3, sideEffect(WATCH_TASK, InstanceUpdateStatus.WORKING))); |
| evaluate( |
| 3, |
| s3, |
| OneWayStatus.SUCCEEDED, |
| ImmutableMap.of(3, sideEffect(InstanceUpdateStatus.SUCCEEDED))); |
| } |
| |
| @Test |
| public void testFailedUpdate() { |
| expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY)) |
| .andReturn(ImmutableSet.of(0, 1)); |
| String s0 = "0"; |
| String s1 = "1"; |
| expectFetchAndEvaluate( |
| 0, |
| instance0, |
| s0, |
| FAILED_TERMINATED); |
| expectFetchAndEvaluate( |
| 1, |
| instance1, |
| s1, |
| KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE); |
| |
| control.replay(); |
| |
| jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances); |
| |
| evaluate( |
| OneWayStatus.FAILED, |
| ImmutableMap.of( |
| 0, sideEffect(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.FAILED), |
| 1, sideEffect(KILL_TASK, InstanceUpdateStatus.WORKING))); |
| |
| // The updater should now reject further attempts to evaluate. |
| try { |
| jobUpdater.evaluate(ImmutableMap.of(), stateProvider); |
| fail(); |
| } catch (IllegalStateException e) { |
| // Expected. |
| } |
| } |
| |
| @Test |
| public void testEvaluateCompletedInstance() { |
| expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY)) |
| .andReturn(ImmutableSet.of(0)); |
| expect(strategy.getNextGroup(ImmutableSet.of(1, 2, 3), EMPTY)) |
| .andReturn(ImmutableSet.of(1)); |
| expect(strategy.getNextGroup(ImmutableSet.of(2, 3), ImmutableSet.of(1))) |
| .andReturn(ImmutableSet.of()); |
| String s0 = "0"; |
| String s1 = "1"; |
| expectFetchAndEvaluate( |
| 0, |
| instance0, |
| s0, |
| SUCCEEDED); |
| expectFetchAndEvaluate( |
| 1, |
| instance1, |
| s1, |
| EVALUATE_ON_STATE_CHANGE); |
| |
| control.replay(); |
| |
| jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances); |
| |
| evaluate( |
| OneWayStatus.WORKING, |
| ImmutableMap.of( |
| 0, sideEffect(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.SUCCEEDED), |
| 1, sideEffect(AWAIT_STATE_CHANGE, InstanceUpdateStatus.WORKING))); |
| |
| // Instance 0 is already considered finished, so any further notifications of its state will |
| // no-op. |
| evaluate( |
| 0, |
| s0, |
| OneWayStatus.WORKING, |
| ImmutableMap.of()); |
| } |
| |
| @Test |
| public void testResultsObjectOverrides() { |
| control.replay(); |
| |
| EvaluationResult<String> a = new EvaluationResult<>( |
| OneWayStatus.WORKING, |
| ImmutableMap.of("a", sideEffect(KILL_TASK))); |
| EvaluationResult<String> a2 = new EvaluationResult<>( |
| OneWayStatus.WORKING, |
| ImmutableMap.of("a", sideEffect(KILL_TASK))); |
| EvaluationResult<String> b = new EvaluationResult<>( |
| OneWayStatus.WORKING, |
| ImmutableMap.of("b", sideEffect(KILL_TASK))); |
| EvaluationResult<String> c = new EvaluationResult<>( |
| OneWayStatus.FAILED, |
| ImmutableMap.of("b", sideEffect(KILL_TASK))); |
| assertEquals(ImmutableSet.of(a, b), ImmutableSet.of(a, a2, b)); |
| assertEquals(a, a2); |
| assertNotEquals(a, b); |
| assertNotEquals(b, c); |
| assertNotEquals(a, ""); |
| } |
| |
| @Test |
| public void testSideEffectObjectOverrides() { |
| control.replay(); |
| |
| SideEffect a = sideEffect(KILL_TASK); |
| SideEffect b = sideEffect(KILL_TASK); |
| SideEffect c = sideEffect(KILL_TASK, InstanceUpdateStatus.FAILED); |
| SideEffect d = sideEffect(AWAIT_STATE_CHANGE, InstanceUpdateStatus.FAILED); |
| assertEquals(a, b); |
| assertNotEquals(a, c); |
| assertNotEquals(c, d); |
| assertNotEquals(a, "a string"); |
| } |
| } |