blob: 348386e938ce910c73f1e2e6e2b39f3654a85e7e [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.preemptor;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.IntStream;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Data;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.ExecutorConfig;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.gen.apiConstants;
import org.apache.aurora.scheduler.TierInfo;
import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.mesos.TaskExecutors;
import org.apache.aurora.scheduler.offers.HostOffer;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceTestUtil;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.stats.CachedCounters;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.apache.mesos.v1.Protos;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
import static java.util.stream.Collectors.toSet;
import static org.apache.aurora.gen.MaintenanceMode.NONE;
import static org.apache.aurora.gen.Resource.numCpus;
import static org.apache.aurora.gen.Resource.ramMb;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
import static org.apache.aurora.scheduler.base.TaskTestUtil.PREFERRED_TIER;
import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER;
import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
import static org.apache.aurora.scheduler.preemptor.PreemptionVictimFilter.PreemptionVictimFilterImpl.ORDER;
import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
import static org.apache.aurora.scheduler.resources.ResourceType.GPUS;
import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
import static org.apache.mesos.v1.Protos.Offer;
import static org.apache.mesos.v1.Protos.Resource;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
public class PreemptionVictimFilterTest extends EasyMockTest {
private static final String USER_A = "user_a";
private static final String USER_B = "user_b";
private static final String USER_C = "user_c";
private static final String JOB_A = "job_a";
private static final String JOB_B = "job_b";
private static final String JOB_C = "job_c";
private static final String TASK_ID_A = "task_a";
private static final String TASK_ID_B = "task_b";
private static final String TASK_ID_C = "task_c";
private static final String TASK_ID_D = "task_d";
private static final String HOST_A = "hostA";
private static final String RACK_A = "rackA";
private static final String SLAVE_ID = HOST_A + "_id";
private static final String RACK_ATTRIBUTE = "rack";
private static final String HOST_ATTRIBUTE = "host";
private static final String OFFER = "offer";
private static final Optional<HostOffer> NO_OFFER = Optional.empty();
private static final Amount<Long, Time> UNAVAILABLITY_THRESHOLD = Amount.of(1L, Time.MINUTES);
private StorageTestUtil storageUtil;
private SchedulingFilter schedulingFilter;
private FakeStatsProvider statsProvider;
private PreemptorMetrics preemptorMetrics;
private TierManager tierManager;
private FakeClock clock;
@Before
public void setUp() {
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
statsProvider = new FakeStatsProvider();
preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider));
tierManager = createMock(TierManager.class);
clock = new FakeClock();
ResourceType.initializeEmptyCliArgsForTest();
}
private Optional<ImmutableSet<PreemptionVictim>> runFilter(
ScheduledTask pendingTask,
Optional<HostOffer> offer,
ScheduledTask... victims) {
PreemptionVictimFilter.PreemptionVictimFilterImpl filter =
new PreemptionVictimFilter.PreemptionVictimFilterImpl(
schedulingFilter,
TaskExecutors.NO_OVERHEAD_EXECUTOR,
preemptorMetrics,
tierManager);
return filter.filterPreemptionVictims(
ITaskConfig.build(pendingTask.getAssignedTask().getTask()),
preemptionVictims(victims),
empty(),
offer,
storageUtil.mutableStoreProvider);
}
@Test
public void testPreempted() throws Exception {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
assignToHost(lowPriority);
expectGetTier(lowPriority, DEV_TIER).times(2);
ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
expectGetTier(highPriority, DEV_TIER);
expectFiltering();
control.replay();
assertVictims(runFilter(highPriority, NO_OFFER, lowPriority), lowPriority);
}
@Test
public void testLowestPriorityPreempted() throws Exception {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
assignToHost(lowPriority);
ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1);
assignToHost(lowerPriority);
expectGetTier(lowerPriority, DEV_TIER).atLeastOnce();
ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100);
expectGetTier(highPriority, DEV_TIER);
expectFiltering();
control.replay();
assertVictims(runFilter(highPriority, NO_OFFER, lowerPriority), lowerPriority);
}
@Test
public void testOnePreemptableTask() throws Exception {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
assignToHost(highPriority);
expectGetTier(highPriority, DEV_TIER);
ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99);
assignToHost(lowerPriority);
expectGetTier(lowerPriority, DEV_TIER);
ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1);
assignToHost(lowestPriority);
expectGetTier(lowestPriority, DEV_TIER).times(2);
ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98);
expectGetTier(pendingPriority, DEV_TIER).times(3);
expectFiltering();
control.replay();
assertVictims(
runFilter(pendingPriority, NO_OFFER, highPriority, lowerPriority, lowestPriority),
lowestPriority);
}
@Test
public void testHigherPriorityRunning() throws Exception {
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
assignToHost(highPriority);
expectGetTier(highPriority, DEV_TIER);
ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A);
expectGetTier(task, DEV_TIER);
control.replay();
assertNoVictims(runFilter(task, NO_OFFER, highPriority));
}
@Test
public void testProductionPreemptingNonproduction() throws Exception {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
// Use a very low priority for the production task to show that priority is irrelevant.
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
expectGetTier(p1, PREFERRED_TIER);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
expectGetTier(a1, DEV_TIER).times(2);
assignToHost(a1);
expectFiltering();
control.replay();
assertVictims(runFilter(p1, NO_OFFER, a1), a1);
}
@Test
public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
// Use a very low priority for the production task to show that priority is irrelevant.
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
expectGetTier(p1, PREFERRED_TIER);
ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
assignToHost(a1);
expectGetTier(a1, DEV_TIER).times(2);
expectFiltering();
control.replay();
assertVictims(runFilter(p1, NO_OFFER, a1), a1);
}
@Test
public void testProductionUsersDoNotPreemptEachOther() throws Exception {
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000);
expectGetTier(p1, PREFERRED_TIER);
ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0);
expectGetTier(a1, PREFERRED_TIER);
assignToHost(a1);
control.replay();
assertNoVictims(runFilter(p1, NO_OFFER, a1));
}
// Ensures a production task can preempt 2 tasks on the same host.
@Test
public void testProductionPreemptingManyNonProduction() throws Exception {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
expectGetTier(a1, DEV_TIER).atLeastOnce();
ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
setResource(b1, CPUS, 1.0);
setResource(b1, RAM_MB, 512.0);
expectGetTier(b1, DEV_TIER).atLeastOnce();
setUpHost();
assignToHost(a1);
assignToHost(b1);
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
setResource(p1, CPUS, 2.0);
setResource(p1, RAM_MB, 1024.0);
expectGetTier(p1, PREFERRED_TIER).times(2);
control.replay();
assertVictims(runFilter(p1, NO_OFFER, a1, b1), a1, b1);
}
// Ensures we select the minimal number of tasks to preempt
@Test
public void testMinimalSetPreempted() throws Exception {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 4.0);
setResource(a1, RAM_MB, 4096.0);
expectGetTier(a1, DEV_TIER).atLeastOnce();
ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
b1.getAssignedTask().getTask()
.setResources(ImmutableSet.of(
numCpus(1),
ramMb(512)));
setResource(b1, CPUS, 1.0);
setResource(b1, RAM_MB, 512.0);
expectGetTier(b1, DEV_TIER).anyTimes();
ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2");
setResource(b2, CPUS, 1.0);
setResource(b2, RAM_MB, 512.0);
expectGetTier(b2, DEV_TIER).anyTimes();
setUpHost();
assignToHost(a1);
assignToHost(b1);
assignToHost(b2);
ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1");
setResource(p1, CPUS, 2.0);
setResource(p1, RAM_MB, 1024.0);
expectGetTier(p1, PREFERRED_TIER).times(3);
control.replay();
assertVictims(runFilter(p1, NO_OFFER, b1, b2, a1), a1);
}
// Ensures a production task *never* preempts a production task from another job.
@Test
public void testProductionJobNeverPreemptsProductionJob() throws Exception {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
p1.getAssignedTask().getTask()
.setResources(ImmutableSet.of(
numCpus(2),
ramMb(1024)));
expectGetTier(p1, PREFERRED_TIER);
setUpHost();
assignToHost(p1);
ScheduledTask p2 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p2");
p2.getAssignedTask().getTask()
.setResources(ImmutableSet.of(
numCpus(1),
ramMb(512)));
expectGetTier(p2, PREFERRED_TIER);
control.replay();
assertNoVictims(runFilter(p2, NO_OFFER, p1));
}
// Ensures that we can preempt if a task + offer can satisfy a pending task.
@Test
public void testPreemptWithOfferAndTask() throws Exception {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
setUpHost();
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
a1.getAssignedTask().getTask()
.setResources(ImmutableSet.of(
numCpus(1),
ramMb(512)));
assignToHost(a1);
expectGetTier(a1, DEV_TIER).times(2);
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
p1.getAssignedTask().getTask()
.setResources(ImmutableSet.of(
numCpus(2),
ramMb(1024)));
expectGetTier(p1, PREFERRED_TIER);
control.replay();
assertVictims(
runFilter(
p1,
makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false),
a1),
a1);
}
// Ensures revocable offer resources are filtered out.
@Test
public void testRevocableOfferFiltered() throws Exception {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
setUpHost();
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
expectGetTier(a1, DEV_TIER).times(2);
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
setResource(p1, CPUS, 2.0);
setResource(p1, RAM_MB, 1024.0);
expectGetTier(p1, PREFERRED_TIER);
control.replay();
assertNoVictims(runFilter(
p1,
makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, true),
a1));
}
// Ensures revocable task CPU is not considered for preemption.
@Test
public void testRevocableVictimsFiltered() throws Exception {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
setUpHost();
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
expectGetTier(a1, REVOCABLE_TIER).times(2);
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
setResource(p1, CPUS, 2.0);
setResource(p1, RAM_MB, 1024.0);
expectGetTier(p1, PREFERRED_TIER);
control.replay();
assertNoVictims(runFilter(
p1,
makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false),
a1));
}
// Ensures revocable victim non-compressible resources are still considered.
@Test
public void testRevocableVictimRamUsed() throws Exception {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
setUpHost();
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
expectGetTier(a1, REVOCABLE_TIER).times(2);
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
setResource(p1, CPUS, 2.0);
setResource(p1, RAM_MB, 1024.0);
expectGetTier(p1, PREFERRED_TIER);
control.replay();
assertVictims(
runFilter(
p1,
makeOffer(OFFER, 2, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false),
a1),
a1);
}
// Ensures we can preempt if two tasks and an offer can satisfy a pending task.
@Test
public void testPreemptWithOfferAndMultipleTasks() throws Exception {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
setUpHost();
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
expectGetTier(a1, DEV_TIER).atLeastOnce();
ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2");
setResource(a2, CPUS, 1.0);
setResource(a2, RAM_MB, 512.0);
assignToHost(a2);
expectGetTier(a2, DEV_TIER).atLeastOnce();
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
setResource(p1, CPUS, 4.0);
setResource(p1, RAM_MB, 2048.0);
expectGetTier(p1, PREFERRED_TIER).times(2);
control.replay();
Optional<HostOffer> offer =
makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1, false);
assertVictims(runFilter(p1, offer, a1, a2), a1, a2);
}
@Test
public void testNoPreemptionVictims() {
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
control.replay();
assertNoVictims(runFilter(task, NO_OFFER));
}
@Test
public void testMissingAttributes() {
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
assignToHost(task);
expectGetTier(task, PREFERRED_TIER);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
expectGetTier(a1, DEV_TIER);
expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.empty());
control.replay();
assertNoVictims(runFilter(task, NO_OFFER, a1));
assertEquals(1L, statsProvider.getLongValue(MISSING_ATTRIBUTES_NAME));
}
@Test
public void testAllVictimsVetoed() {
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
assignToHost(task);
expectGetTier(task, PREFERRED_TIER);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
expectGetTier(a1, DEV_TIER).times(2);
setUpHost();
expectFiltering(Optional.of(Veto.constraintMismatch("ban")));
control.replay();
assertNoVictims(runFilter(task, NO_OFFER, a1));
}
@Test
public void testOrder() {
control.replay();
ResourceBag one = bag(1, 1, 1);
ResourceBag two = bag(2, 2, 2);
ResourceBag three = bag(3, 3, 3);
assertEquals(
ImmutableList.of(one, two, three, three),
ORDER.sortedCopy(ImmutableList.of(three, one, two, three)));
}
@Test
public void testOrderDifferentResources() {
control.replay();
ResourceBag one = bag(ImmutableMap.of(
CPUS, 1.0,
RAM_MB, 1.0
));
ResourceBag two = bag(ImmutableMap.of(
CPUS, 1.0,
GPUS, 1.0
));
ResourceBag three = bag(ImmutableMap.of(
CPUS, 1.0
));
assertEquals(0, ORDER.compare(one, two));
assertEquals(1, ORDER.compare(one, three));
}
private static ImmutableSet<PreemptionVictim> preemptionVictims(ScheduledTask... tasks) {
return FluentIterable.from(ImmutableSet.copyOf(tasks))
.transform(
task -> PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask()))).toSet();
}
private static void assertVictims(
Optional<ImmutableSet<PreemptionVictim>> actual,
ScheduledTask... expected) {
assertEquals(Optional.of(preemptionVictims(expected)), actual);
}
private static void assertNoVictims(Optional<ImmutableSet<PreemptionVictim>> actual) {
assertEquals(Optional.<ImmutableSet<PreemptionVictim>>empty(), actual);
}
private Optional<HostOffer> makeOffer(
String offerId,
double cpu,
Amount<Long, Data> ram,
Amount<Long, Data> disk,
int numPorts,
boolean revocable) {
List<Resource> resources = ImmutableList.of(
mesosScalar(CPUS, cpu),
mesosScalar(RAM_MB, ram.getValue()),
mesosScalar(DISK_MB, disk.getValue()),
mesosRange(
PORTS,
Optional.empty(),
IntStream.range(1, numPorts).boxed().collect(toSet())));
if (revocable) {
resources = ImmutableList.<Resource>builder()
.addAll(FluentIterable.from(resources)
.filter(e -> !e.getName().equals(CPUS.getMesosName()))
.toList())
.add(Protos.Resource.newBuilder()
.setName(CPUS.getMesosName())
.setType(Protos.Value.Type.SCALAR)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu))
.setRevocable(Resource.RevocableInfo.newBuilder())
.build())
.build();
}
Offer.Builder builder = Offer.newBuilder();
builder.getIdBuilder().setValue(offerId);
builder.getFrameworkIdBuilder().setValue("framework-id");
builder.getAgentIdBuilder().setValue(SLAVE_ID);
builder.setHostname(HOST_A);
builder.addAllResources(resources);
return Optional.of(new HostOffer(
builder.build(),
IHostAttributes.build(new HostAttributes().setMode(NONE))));
}
private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering() {
return expectFiltering(Optional.empty());
}
private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering(
final Optional<Veto> veto) {
return expect(schedulingFilter.filter(
EasyMock.anyObject(),
EasyMock.anyObject()))
.andAnswer(() -> veto.map(ImmutableSet::of).orElse(ImmutableSet.of()));
}
private IExpectationSetters<TierInfo> expectGetTier(ScheduledTask task, TierInfo tier) {
return expect(tierManager.getTier(ITaskConfig.build(task.getAssignedTask().getTask())))
.andReturn(tier);
}
private static void setResource(ScheduledTask task, ResourceType type, Double value) {
task.getAssignedTask().setTask(ResourceTestUtil.resetResource(
ITaskConfig.build(task.getAssignedTask().getTask()),
type,
value).newBuilder());
}
private static ScheduledTask makeTask(
String role,
String job,
String taskId,
int priority,
String env,
boolean production) {
AssignedTask assignedTask = new AssignedTask()
.setTaskId(taskId)
.setTask(new TaskConfig()
.setJob(new JobKey(role, env, job))
.setPriority(priority)
.setProduction(production)
.setConstraints(Sets.newHashSet())
.setExecutorConfig(new ExecutorConfig(apiConstants.AURORA_EXECUTOR_NAME, "config")));
return new ScheduledTask().setAssignedTask(assignedTask);
}
static ScheduledTask makeTask(String role, String job, String taskId) {
return makeTask(role, job, taskId, 0, "dev", false);
}
static void addEvent(ScheduledTask task, ScheduleStatus status) {
task.addToTaskEvents(new TaskEvent(0, status));
}
private ScheduledTask makeProductionTask(String role, String job, String taskId) {
return makeTask(role, job, taskId, 0, "prod", true);
}
private ScheduledTask makeProductionTask(String role, String job, String taskId, int priority) {
return makeTask(role, job, taskId, priority, "prod", true);
}
private ScheduledTask makeTask(String role, String job, String taskId, int priority) {
return makeTask(role, job, taskId, priority, "dev", false);
}
private void assignToHost(ScheduledTask task) {
task.setStatus(RUNNING);
addEvent(task, RUNNING);
task.getAssignedTask().setSlaveHost(HOST_A);
task.getAssignedTask().setSlaveId(SLAVE_ID);
}
private Attribute host(String host) {
return new Attribute(HOST_ATTRIBUTE, ImmutableSet.of(host));
}
private Attribute rack(String rack) {
return new Attribute(RACK_ATTRIBUTE, ImmutableSet.of(rack));
}
// Sets up a normal host, no dedicated hosts and no maintenance.
private void setUpHost() {
IHostAttributes hostAttrs = IHostAttributes.build(
new HostAttributes().setHost(HOST_A).setSlaveId(HOST_A + "_id")
.setMode(NONE).setAttributes(ImmutableSet.of(rack(RACK_A), host(RACK_A))));
expect(storageUtil.attributeStore.getHostAttributes(HOST_A))
.andReturn(Optional.of(hostAttrs)).anyTimes();
}
}