blob: 6321ec068bd16737f96b39d5fdd8db25f3dea15c [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler;
import java.util.EnumSet;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
import static org.apache.aurora.gen.ScheduleStatus.FAILED;
import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
import static org.apache.aurora.gen.ScheduleStatus.INIT;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.scheduler.TaskVars.VETO_GROUPS_TO_COUNTERS;
import static org.apache.aurora.scheduler.TaskVars.VETO_TYPE_TO_COUNTERS;
import static org.apache.aurora.scheduler.TaskVars.jobStatName;
import static org.apache.aurora.scheduler.TaskVars.rackStatName;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
public class TaskVarsTest extends EasyMockTest {
private static final IJobKey JOB_A = JobKeys.from("role_a", "test", "job_a");
private static final IJobKey JOB_B = JobKeys.from("role_a", "test", "job_b");
private static final String STATIC_COUNTER = VETO_GROUPS_TO_COUNTERS.get(VetoGroup.STATIC);
private static final String DYNAMIC_COUNTER = VETO_GROUPS_TO_COUNTERS.get(VetoGroup.DYNAMIC);
private static final String MIXED_COUNTER = VETO_GROUPS_TO_COUNTERS.get(VetoGroup.MIXED);
private static final String INSUFFICIENT_RESOURCES_COUNTER = VETO_TYPE_TO_COUNTERS.get(
VetoType.INSUFFICIENT_RESOURCES);
private static final String LIMIT_NOT_SATISFIED_COUNTER = VETO_TYPE_TO_COUNTERS.get(
VetoType.LIMIT_NOT_SATISFIED);
private StorageTestUtil storageUtil;
private StatsProvider trackedProvider;
private StatsProvider untrackedProvider;
private TaskVars vars;
private Map<String, Supplier<Long>> globalCounters;
@Before
public void setUp() {
storageUtil = new StorageTestUtil(this);
trackedProvider = createMock(StatsProvider.class);
untrackedProvider = createMock(StatsProvider.class);
expect(trackedProvider.untracked()).andReturn(untrackedProvider);
storageUtil.expectOperations();
globalCounters = Maps.newHashMap();
}
private void replayAndBuild() {
control.replay();
vars = new TaskVars(storageUtil.storage, trackedProvider);
}
private void expectStatExport(String name) {
expectStatExport(name, trackedProvider);
}
private void expectStatExport(String name, StatsProvider provider) {
expect(provider.makeGauge(EasyMock.eq(name), EasyMock.<Supplier<Long>>anyObject()))
.andAnswer(() -> {
assertFalse(globalCounters.containsKey(name));
@SuppressWarnings("unchecked")
Supplier<Long> varSupplier = (Supplier<Long>) EasyMock.getCurrentArguments()[1];
globalCounters.put(name, varSupplier);
return null;
});
}
private void expectStatusCountersInitialized() {
for (ScheduleStatus status : ScheduleStatus.values()) {
expectStatExport(TaskVars.getVarName(status));
}
}
private void changeState(IScheduledTask task, ScheduleStatus status) {
vars.taskChangedState(TaskStateChange.transition(
IScheduledTask.build(task.newBuilder().setStatus(status)),
task.getStatus()));
}
private void applyVeto(Veto... vetoes) {
vars.taskVetoed(ImmutableSet.copyOf(vetoes));
}
private void schedulerActivated(IScheduledTask... initialTasks) {
for (IScheduledTask task : initialTasks) {
vars.taskChangedState(TaskStateChange.initialized(task));
}
vars.startAsync().awaitRunning();
}
private IScheduledTask makeTask(IJobKey job, ScheduleStatus status, String host) {
ScheduledTask task = TaskTestUtil.makeTask("task_id", job).newBuilder()
.setStatus(status);
if (Tasks.SLAVE_ASSIGNED_STATES.contains(status) || Tasks.isTerminated(status)) {
task.getAssignedTask().setSlaveHost(host);
}
return IScheduledTask.build(task);
}
private IScheduledTask makeTask(IJobKey job, ScheduleStatus status) {
return makeTask(job, status, "hostA");
}
private void assertAllZero() {
for (Supplier<Long> counter : globalCounters.values()) {
assertEquals(0L, counter.get().longValue());
}
}
@Test
public void testStartsAtZero() {
expectStatusCountersInitialized();
replayAndBuild();
schedulerActivated();
assertAllZero();
}
@Test
public void testNoEarlyExport() {
replayAndBuild();
// No variables should be exported since schedulerActive is never called.
IScheduledTask taskA = makeTask(JOB_A, INIT);
changeState(taskA, PENDING);
changeState(IScheduledTask.build(taskA.newBuilder().setStatus(PENDING)), ASSIGNED);
}
private int getValue(String name) {
return globalCounters.get(name).get().intValue();
}
private int getValue(ScheduleStatus status) {
return getValue(TaskVars.getVarName(status));
}
@Test
public void testTaskLifeCycle() {
expectStatusCountersInitialized();
IScheduledTask taskA = makeTask(JOB_A, INIT);
expectGetHostRack("hostA", "rackA").atLeastOnce();
expectStatExport(rackStatName("rackA"));
replayAndBuild();
schedulerActivated();
changeState(makeTask(JOB_A, INIT), PENDING);
assertEquals(1, getValue(PENDING));
changeState(IScheduledTask.build(taskA.newBuilder().setStatus(PENDING)), ASSIGNED);
assertEquals(0, getValue(PENDING));
assertEquals(1, getValue(ASSIGNED));
taskA = makeTask(JOB_A, ASSIGNED, "hostA");
changeState(IScheduledTask.build(taskA.newBuilder().setStatus(ASSIGNED)), RUNNING);
assertEquals(0, getValue(ASSIGNED));
assertEquals(1, getValue(RUNNING));
changeState(IScheduledTask.build(taskA.newBuilder().setStatus(RUNNING)), FINISHED);
assertEquals(0, getValue(RUNNING));
assertEquals(1, getValue(FINISHED));
assertEquals(0, getValue(rackStatName("rackA")));
vars.tasksDeleted(new TasksDeleted(ImmutableSet.of(
IScheduledTask.build(taskA.newBuilder().setStatus(FINISHED)))));
assertAllZero();
}
@Test
public void testStaticVetoGroup() {
expectStatusCountersInitialized();
expectStatExport(STATIC_COUNTER);
expectStatExport(INSUFFICIENT_RESOURCES_COUNTER);
replayAndBuild();
schedulerActivated();
applyVeto(Veto.insufficientResources("ram", 500),
Veto.insufficientResources("cpu", 500));
assertEquals(1, getValue(STATIC_COUNTER));
assertEquals(2, getValue(INSUFFICIENT_RESOURCES_COUNTER));
}
@Test
public void testDynamicVetoGroup() {
expectStatusCountersInitialized();
expectStatExport(DYNAMIC_COUNTER);
expectStatExport(LIMIT_NOT_SATISFIED_COUNTER);
replayAndBuild();
schedulerActivated();
applyVeto(Veto.unsatisfiedLimit("constraint"));
assertEquals(1, getValue(DYNAMIC_COUNTER));
assertEquals(1, getValue(LIMIT_NOT_SATISFIED_COUNTER));
}
@Test
public void testMixedVetoGroup() {
expectStatusCountersInitialized();
expectStatExport(MIXED_COUNTER);
expectStatExport(LIMIT_NOT_SATISFIED_COUNTER);
expectStatExport(INSUFFICIENT_RESOURCES_COUNTER);
replayAndBuild();
schedulerActivated();
applyVeto(Veto.unsatisfiedLimit("constraint"),
Veto.insufficientResources("ram", 500));
assertEquals(1, getValue(MIXED_COUNTER));
assertEquals(1, getValue(LIMIT_NOT_SATISFIED_COUNTER));
assertEquals(1, getValue(INSUFFICIENT_RESOURCES_COUNTER));
}
@Test
public void testLoadsFromStorage() {
expectStatusCountersInitialized();
expectGetHostRack("hostA", "rackA").atLeastOnce();
expectGetHostRack("hostB", "rackB").atLeastOnce();
expectStatExport(rackStatName("rackA"));
expectStatExport(rackStatName("rackB"));
IScheduledTask failedTask = makeTask(JOB_B, FAILED, "hostB");
expectStatExport(jobStatName(failedTask, FAILED), untrackedProvider);
replayAndBuild();
schedulerActivated(
makeTask(JOB_A, PENDING),
makeTask(JOB_A, RUNNING, "hostA"),
makeTask(JOB_A, FINISHED, "hostA"),
makeTask(JOB_B, PENDING),
failedTask);
assertEquals(2, getValue(PENDING));
assertEquals(1, getValue(RUNNING));
assertEquals(1, getValue(FINISHED));
assertEquals(1, getValue(FAILED));
assertEquals(0, getValue(rackStatName("rackA")));
assertEquals(0, getValue(rackStatName("rackB")));
assertEquals(1, getValue(jobStatName(failedTask, FAILED)));
}
private IExpectationSetters<?> expectGetHostRack(String host, String rackToReturn) {
IHostAttributes attributes = IHostAttributes.build(new HostAttributes()
.setHost(host)
.setAttributes(ImmutableSet.of(
new Attribute().setName("rack").setValues(ImmutableSet.of(rackToReturn)))));
return expect(storageUtil.attributeStore.getHostAttributes(host))
.andReturn(Optional.of(attributes));
}
@Test
public void testLostCounters() {
expectStatusCountersInitialized();
expectGetHostRack("host1", "rackA").atLeastOnce();
expectGetHostRack("host2", "rackB").atLeastOnce();
expectGetHostRack("host3", "rackB").atLeastOnce();
expectStatExport(rackStatName("rackA"));
expectStatExport(rackStatName("rackB"));
IScheduledTask a = makeTask(JOB_A, RUNNING, "host1");
IScheduledTask b = makeTask(JOB_B, RUNNING, "host2");
IJobKey jobD = JobKeys.from(JOB_A.getRole(), JOB_A.getEnvironment(), "jobD");
IScheduledTask c = makeTask(jobD, RUNNING, "host3");
IScheduledTask d = makeTask(jobD, RUNNING, "host1");
expectStatExport(jobStatName(a, LOST), untrackedProvider);
expectStatExport(jobStatName(b, LOST), untrackedProvider);
expectStatExport(jobStatName(c, LOST), untrackedProvider);
replayAndBuild();
schedulerActivated();
changeState(a, LOST);
changeState(b, LOST);
changeState(c, LOST);
changeState(d, LOST);
assertEquals(2, getValue(rackStatName("rackA")));
assertEquals(2, getValue(rackStatName("rackB")));
assertEquals(1, getValue(jobStatName(a, LOST)));
assertEquals(1, getValue(jobStatName(b, LOST)));
assertEquals(2, getValue(jobStatName(c, LOST)));
}
@Test
public void testRackMissing() {
expectStatusCountersInitialized();
expect(storageUtil.attributeStore.getHostAttributes("a"))
.andReturn(Optional.empty());
IScheduledTask a = makeTask(JOB_A, RUNNING, "a");
expectStatExport(jobStatName(a, LOST), untrackedProvider);
replayAndBuild();
schedulerActivated();
changeState(a, LOST);
// Since no attributes are stored for the host, a variable is not exported/updated.
}
@Test
public void testAllVetoGroupsCovered() {
replayAndBuild();
for (VetoGroup group : EnumSet.complementOf(EnumSet.of(VetoGroup.EMPTY))) {
assertNotNull("Unknown VetoGroup value: " + group, VETO_GROUPS_TO_COUNTERS.get(group));
}
}
}