blob: efe0c06312eca6321abe0db5153e881ce601bf1b [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.updater;
import java.util.Optional;
import java.util.function.Consumer;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.InstanceKey;
import org.apache.aurora.gen.InstanceTaskConfig;
import org.apache.aurora.gen.JobUpdateInstructions;
import org.apache.aurora.gen.JobUpdateKey;
import org.apache.aurora.gen.JobUpdateSettings;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.gen.PercentageSlaPolicy;
import org.apache.aurora.gen.Range;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.SlaPolicy;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.updater.InstanceActionHandler.KillTask;
import org.easymock.Capture;
import org.junit.Before;
import org.junit.Test;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class KillTaskTest extends EasyMockTest {
private static final String TASK_ID = "task_id";
private static final IJobKey JOB = JobKeys.from("role", "env", "job");
private static final IInstanceKey INSTANCE =
IInstanceKey.build(new InstanceKey(JOB.newBuilder(), 0));
private static final IJobUpdateKey UPDATE_ID =
IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "update_id"));
private static final SlaPolicy TEST_SLA_POLICY_OLD = SlaPolicy.percentageSlaPolicy(
new PercentageSlaPolicy()
.setPercentage(95)
.setDurationSecs(3600));
private static final SlaPolicy TEST_SLA_POLICY_NEW = SlaPolicy.percentageSlaPolicy(
new PercentageSlaPolicy()
.setPercentage(0)
.setDurationSecs(0));
private static final ITaskConfig CONFIG_NO_SLA = ITaskConfig.build(
TaskTestUtil.makeConfig(JOB, true, Optional.empty()).newBuilder());
private static final ITaskConfig OLD_CONFIG_WITH_SLA =
TaskTestUtil.makeConfig(JOB, true, Optional.of(TEST_SLA_POLICY_OLD));
private static final ITaskConfig NEW_CONFIG_WITH_SLA =
TaskTestUtil.makeConfig(JOB, true, Optional.of(TEST_SLA_POLICY_NEW));
private static final IJobUpdateInstructions INSTRUCTIONS = IJobUpdateInstructions.build(
new JobUpdateInstructions()
.setSettings(
new JobUpdateSettings()
.setMinWaitInInstanceRunningMs(1000)));
private static final IJobUpdateInstructions INSTRUCTIONS_SLA_AWARE = IJobUpdateInstructions.build(
new JobUpdateInstructions()
.setSettings(
new JobUpdateSettings()
.setMinWaitInInstanceRunningMs(1000)
.setSlaAware(true))
.setInitialState(ImmutableSet.of(new InstanceTaskConfig(
OLD_CONFIG_WITH_SLA.newBuilder(),
ImmutableSet.of(new Range(0, 0)))))
.setDesiredState(new InstanceTaskConfig(
NEW_CONFIG_WITH_SLA.newBuilder(),
ImmutableSet.of(new Range(0, 0)))));
private static final IJobUpdateInstructions INSTRUCTIONS_SLA_AWARE_NO_POLICY
= IJobUpdateInstructions.build(
new JobUpdateInstructions()
.setSettings(
new JobUpdateSettings()
.setMinWaitInInstanceRunningMs(1000)
.setSlaAware(true))
.setDesiredState(new InstanceTaskConfig(
CONFIG_NO_SLA.newBuilder(),
ImmutableSet.of(new Range(0, 0)))));
private StorageTestUtil storageUtil;
private StateManager stateManager;
private InstanceActionHandler handler;
private UpdateAgentReserver updateAgentReserver;
private SlaKillController slaKillController;
@Before
public void setUp() {
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
stateManager = createMock(StateManager.class);
updateAgentReserver = createMock(UpdateAgentReserver.class);
handler = new KillTask(false);
slaKillController = createMock(SlaKillController.class);
}
@Test
public void testInstanceKill() throws Exception {
storageUtil.expectTaskFetch(
Query.instanceScoped(INSTANCE).active(),
TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey()));
expect(stateManager.changeState(
storageUtil.mutableStoreProvider,
TASK_ID,
Optional.empty(),
ScheduleStatus.KILLING,
Optional.of("Killed for job update " + UPDATE_ID.getId())))
.andReturn(StateChangeResult.SUCCESS);
control.replay();
handler.getReevaluationDelay(
INSTANCE,
INSTRUCTIONS,
storageUtil.mutableStoreProvider,
stateManager,
updateAgentReserver,
JobUpdateStatus.ROLLING_BACK,
UPDATE_ID,
slaKillController);
}
@Test
public void testKillForUpdateReservesAgentForInstance() throws Exception {
IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01");
storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
expect(stateManager.changeState(
storageUtil.mutableStoreProvider,
TASK_ID,
Optional.empty(),
ScheduleStatus.KILLING,
Optional.of("Killed for job update " + UPDATE_ID.getId())))
.andReturn(StateChangeResult.SUCCESS);
updateAgentReserver.reserve(task.getAssignedTask().getSlaveId(), INSTANCE);
expectLastCall();
control.replay();
new KillTask(true).getReevaluationDelay(
INSTANCE,
INSTRUCTIONS,
storageUtil.mutableStoreProvider,
stateManager,
updateAgentReserver,
JobUpdateStatus.ROLLING_BACK,
UPDATE_ID,
slaKillController);
}
@Test
public void testInstanceNotFoundDoesNotThrow() throws Exception {
storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active());
control.replay();
handler.getReevaluationDelay(
INSTANCE,
INSTRUCTIONS,
storageUtil.mutableStoreProvider,
stateManager,
updateAgentReserver,
JobUpdateStatus.ROLLING_BACK,
UPDATE_ID,
slaKillController);
}
/**
* Ensures that if an instance is killed with code {@code slaAware} option in the instructions,
* then the kill is sent to be handled by the {@link SlaKillController}.
*/
@Test
public void testInstanceKillSlaAware() throws Exception {
IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey());
storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
slaKillController.slaKill(
eq(storageUtil.mutableStoreProvider),
eq(INSTANCE),
eq(task),
eq(UPDATE_ID),
eq(INSTRUCTIONS_SLA_AWARE.getDesiredState().getTask().getSlaPolicy()),
eq(JobUpdateStatus.ROLLING_FORWARD),
anyObject());
expectLastCall();
control.replay();
handler.getReevaluationDelay(
INSTANCE,
INSTRUCTIONS_SLA_AWARE,
storageUtil.mutableStoreProvider,
stateManager,
updateAgentReserver,
JobUpdateStatus.ROLLING_FORWARD,
UPDATE_ID,
slaKillController);
}
/**
* Ensures that if an instance killed while {@link JobUpdateStatus#ROLLING_BACK}, it uses the old
* configuration.
*/
@Test
public void testInstanceKillSlaAwareRollback() throws Exception {
IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01");
storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
Capture<Consumer<Storage.MutableStoreProvider>> killCommandCapture = createCapture();
slaKillController.slaKill(
eq(storageUtil.mutableStoreProvider),
eq(INSTANCE),
eq(task),
eq(UPDATE_ID),
eq(INSTRUCTIONS_SLA_AWARE.getInitialState().asList().get(0).getTask().getSlaPolicy()),
eq(JobUpdateStatus.ROLLING_BACK),
capture(killCommandCapture));
expectLastCall();
control.replay();
handler.getReevaluationDelay(
INSTANCE,
INSTRUCTIONS_SLA_AWARE,
storageUtil.mutableStoreProvider,
stateManager,
updateAgentReserver,
JobUpdateStatus.ROLLING_BACK,
UPDATE_ID,
slaKillController);
}
/**
* Ensure the correct behavior of the consumer passed into
* {@link SlaKillController#slaKill}. It should behave as
* {@link KillTask#killAndMaybeReserve}.
*/
@Test
public void testInstanceKillSlaAwareKillCommand() throws Exception {
IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01");
storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
Capture<Consumer<Storage.MutableStoreProvider>> killCommandCapture = createCapture();
slaKillController.slaKill(
eq(storageUtil.mutableStoreProvider),
eq(INSTANCE),
eq(task),
eq(UPDATE_ID),
eq(INSTRUCTIONS_SLA_AWARE.getDesiredState().getTask().getSlaPolicy()),
eq(JobUpdateStatus.ROLLING_FORWARD),
capture(killCommandCapture));
expectLastCall();
// Ensure the correct kill command consumer has been passed in and expected behavior occurs
// when executed.
expect(stateManager.changeState(
storageUtil.mutableStoreProvider,
TASK_ID,
Optional.empty(),
ScheduleStatus.KILLING,
Optional.of("Killed for job update " + UPDATE_ID.getId())))
.andReturn(StateChangeResult.SUCCESS);
updateAgentReserver.reserve(task.getAssignedTask().getSlaveId(), INSTANCE);
expectLastCall();
control.replay();
new KillTask(true).getReevaluationDelay(
INSTANCE,
INSTRUCTIONS_SLA_AWARE,
storageUtil.mutableStoreProvider,
stateManager,
updateAgentReserver,
JobUpdateStatus.ROLLING_FORWARD,
UPDATE_ID,
slaKillController);
assertTrue(killCommandCapture.hasCaptured());
killCommandCapture.getValue().accept(storageUtil.mutableStoreProvider);
}
/**
* Ensures that if an instance is killed with code {@code slaAware} option in the instructions
* but no {@link org.apache.aurora.gen.SlaPolicy} with the task, then the kill does not fail but
* instead continues as a non-sla-aware kill.
*/
@Test
public void testInstanceKillSlaAwareMissingSlaPolicy() throws Exception {
IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01");
storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
expect(stateManager.changeState(
storageUtil.mutableStoreProvider,
TASK_ID,
Optional.empty(),
ScheduleStatus.KILLING,
Optional.of("Killed for job update " + UPDATE_ID.getId())))
.andReturn(StateChangeResult.SUCCESS);
control.replay();
handler.getReevaluationDelay(
INSTANCE,
INSTRUCTIONS_SLA_AWARE_NO_POLICY,
storageUtil.mutableStoreProvider,
stateManager,
updateAgentReserver,
JobUpdateStatus.ROLLING_FORWARD,
UPDATE_ID,
slaKillController);
}
/**
* Ensures that if SLA-aware kill is called while not in an active state we throw an exception.
*/
@Test
public void testInstanceKillSlaAwareBadStatus() {
IScheduledTask task = TaskTestUtil.makeTask(TASK_ID, INSTANCE.getJobKey(), 1, "agent01");
storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
control.replay();
try {
handler.getReevaluationDelay(
INSTANCE,
INSTRUCTIONS_SLA_AWARE_NO_POLICY,
storageUtil.mutableStoreProvider,
stateManager,
updateAgentReserver,
JobUpdateStatus.ROLL_FORWARD_PAUSED,
UPDATE_ID,
slaKillController);
fail();
} catch (UpdateStateException e) {
// Expected
}
}
}