blob: f2a93e8cf879d3b1cc7fb8a0f83b018a3e5f1644 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.preemptor;
import java.util.Optional;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.offers.HostOffer;
import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.aurora.scheduler.preemptor.Preemptor.PreemptorImpl;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.stats.CachedCounters;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.apache.mesos.v1.Protos;
import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.preemptedByJobStatName;
import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.preemptedStatName;
import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.preemptorByJobStatName;
import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotValidationByJobStatName;
import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotValidationStatName;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
public class PreemptorImplTest extends EasyMockTest {
private static final String TASK_NAME = "preemptor-name";
private static final IScheduledTask TASK = IScheduledTask.build(makeTask(TASK_NAME));
private static final String SLAVE_ID_1 = "slave_id_1";
private static final String SLAVE_ID_2 = "slave_id_2";
private static final IScheduledTask SLAVE_1_TASK = IScheduledTask.build(makeTask(SLAVE_ID_1));
private static final IScheduledTask SLAVE_2_TASK = IScheduledTask.build(makeTask(SLAVE_ID_2));
private static final PreemptionProposal PROPOSAL_1 = createPreemptionProposal(SLAVE_1_TASK,
SLAVE_ID_1);
private static final PreemptionProposal PROPOSAL_2 = createPreemptionProposal(SLAVE_2_TASK,
SLAVE_ID_2);
private static final TaskGroupKey GROUP_KEY =
TaskGroupKey.from(ITaskConfig.build(makeTask(TASK_NAME).getAssignedTask().getTask()));
private static final Set<PreemptionProposal> NO_SLOTS = ImmutableSet.of();
private static final Optional<String> EMPTY_RESULT = Optional.empty();
private static final HostOffer OFFER =
new HostOffer(Protos.Offer.getDefaultInstance(), IHostAttributes.build(new HostAttributes()));
private StateManager stateManager;
private FakeStatsProvider statsProvider;
private PreemptionVictimFilter preemptionVictimFilter;
private PreemptorImpl preemptor;
private BiCache<PreemptionProposal, TaskGroupKey> slotCache;
private Storage.MutableStoreProvider storeProvider;
@Before
public void setUp() {
storeProvider = createMock(Storage.MutableStoreProvider.class);
stateManager = createMock(StateManager.class);
preemptionVictimFilter = createMock(PreemptionVictimFilter.class);
slotCache = createMock(new Clazz<BiCache<PreemptionProposal, TaskGroupKey>>() { });
statsProvider = new FakeStatsProvider();
OfferManager offerManager = createMock(OfferManager.class);
expect(offerManager.get(anyObject(Protos.AgentID.class)))
.andReturn(Optional.of(OFFER))
.anyTimes();
preemptor = new PreemptorImpl(
stateManager,
offerManager,
preemptionVictimFilter,
new PreemptorMetrics(new CachedCounters(statsProvider)),
slotCache);
}
@Test
public void testPreemptTasksSuccessful() throws Exception {
expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1, PROPOSAL_2));
slotCache.remove(PROPOSAL_1, GROUP_KEY);
expectSlotValidation(PROPOSAL_1, Optional.of(ImmutableSet.of(
PreemptionVictim.fromTask(SLAVE_1_TASK.getAssignedTask()))));
expectPreempted(TASK);
control.replay();
assertEquals(Optional.of(SLAVE_ID_1), callPreemptor());
assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true)));
assertEquals(1L, statsProvider.getLongValue(slotValidationByJobStatName(true,
TASK.getAssignedTask().getTask().getJob())));
PROPOSAL_1.getVictims().forEach(victim ->
assertEquals(1L, statsProvider.getLongValue(preemptedByJobStatName(
victim.getConfig().getJob()))));
assertEquals(1L, statsProvider.getLongValue(preemptorByJobStatName(
TASK.getAssignedTask().getTask().getJob())));
assertEquals(1L, statsProvider.getLongValue(preemptedStatName(true)));
}
@Test
public void testPreemptTasksValidationFailed() throws Exception {
expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1));
slotCache.remove(PROPOSAL_1, GROUP_KEY);
expectSlotValidation(PROPOSAL_1, Optional.empty());
control.replay();
assertEquals(EMPTY_RESULT, callPreemptor());
assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false)));
assertEquals(1L, statsProvider.getLongValue(slotValidationByJobStatName(false,
TASK.getAssignedTask().getTask().getJob())));
assertEquals(0L, statsProvider.getLongValue(preemptedStatName(true)));
}
@Test
public void testMultiplePreemptionProposalsSuccessful() throws Exception {
expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1, PROPOSAL_2));
slotCache.remove(PROPOSAL_1, GROUP_KEY);
expectSlotValidation(PROPOSAL_1, Optional.empty());
slotCache.remove(PROPOSAL_2, GROUP_KEY);
expectSlotValidation(PROPOSAL_2, Optional.of(ImmutableSet.of(
PreemptionVictim.fromTask(SLAVE_2_TASK.getAssignedTask()))));
expectPreempted(TASK);
control.replay();
assertEquals(Optional.of(SLAVE_ID_2), callPreemptor());
assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false)));
assertEquals(1L, statsProvider.getLongValue(slotValidationByJobStatName(false,
TASK.getAssignedTask().getTask().getJob())));
assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true)));
assertEquals(1L, statsProvider.getLongValue(slotValidationByJobStatName(true,
TASK.getAssignedTask().getTask().getJob())));
PROPOSAL_2.getVictims().forEach(victim ->
assertEquals(1L, statsProvider.getLongValue(preemptedByJobStatName(
victim.getConfig().getJob()))));
assertEquals(1L, statsProvider.getLongValue(preemptorByJobStatName(
TASK.getAssignedTask().getTask().getJob())));
assertEquals(1L, statsProvider.getLongValue(preemptedStatName(true)));
}
@Test
public void testMultiplePreemptionProposalsFailed() throws Exception {
expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1, PROPOSAL_2));
slotCache.remove(PROPOSAL_1, GROUP_KEY);
expectSlotValidation(PROPOSAL_1, Optional.empty());
slotCache.remove(PROPOSAL_2, GROUP_KEY);
expectSlotValidation(PROPOSAL_2, Optional.empty());
control.replay();
assertEquals(EMPTY_RESULT, callPreemptor());
assertEquals(2L, statsProvider.getLongValue(slotValidationStatName(false)));
assertEquals(2L, statsProvider.getLongValue(slotValidationByJobStatName(false,
TASK.getAssignedTask().getTask().getJob())));
assertEquals(0L, statsProvider.getLongValue(preemptedStatName(true)));
}
@Test
public void testNoCachedSlot() throws Exception {
expect(slotCache.getByValue(GROUP_KEY)).andReturn(NO_SLOTS);
control.replay();
assertEquals(EMPTY_RESULT, callPreemptor());
assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(false)));
assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true)));
assertEquals(0L, statsProvider.getLongValue(preemptedStatName(true)));
}
private Optional<String> callPreemptor() {
return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), empty(), storeProvider);
}
private void expectSlotValidation(
PreemptionProposal slot,
Optional<ImmutableSet<PreemptionVictim>> victims) {
expect(preemptionVictimFilter.filterPreemptionVictims(
TASK.getAssignedTask().getTask(),
slot.getVictims(),
empty(),
Optional.of(OFFER),
storeProvider)).andReturn(victims);
}
private void expectPreempted(IScheduledTask preempted) throws Exception {
expect(stateManager.changeState(
anyObject(Storage.MutableStoreProvider.class),
eq(Tasks.id(preempted)),
eq(Optional.empty()),
eq(ScheduleStatus.PREEMPTING),
anyObject()))
.andReturn(StateChangeResult.SUCCESS);
}
private static PreemptionProposal createPreemptionProposal(IScheduledTask task, String slaveId) {
IAssignedTask assigned = task.getAssignedTask();
return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), slaveId);
}
private static ScheduledTask makeTask(String name) {
ScheduledTask task = new ScheduledTask()
.setAssignedTask(new AssignedTask()
.setTask(new TaskConfig()
.setPriority(1)
.setProduction(true)
.setJob(new JobKey("role", "env", name))));
task.addToTaskEvents(new TaskEvent(0, PENDING));
return task;
}
}