blob: 1375c0853d5df379d50a2b2498f9a99db8f29421 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.pruning;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.gen.InstanceTaskConfig;
import org.apache.aurora.gen.JobUpdate;
import org.apache.aurora.gen.JobUpdateDetails;
import org.apache.aurora.gen.JobUpdateEvent;
import org.apache.aurora.gen.JobUpdateInstructions;
import org.apache.aurora.gen.JobUpdateKey;
import org.apache.aurora.gen.JobUpdateState;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.gen.JobUpdateSummary;
import org.apache.aurora.gen.Range;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.pruning.JobUpdateHistoryPruner.HistoryPrunerSettings;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
import static org.apache.aurora.gen.JobUpdateStatus.FAILED;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
import static org.junit.Assert.assertEquals;
public class JobUpdateHistoryPrunerTest {
private Storage storage;
@Before
public void setUp() {
storage = MemStorageModule.newEmptyStorage();
}
@Test
public void testPruneHistory() {
IJobKey job2 = JobKeys.from("testRole2", "testEnv2", "job2");
IJobKey missingKey = JobKeys.from("missing-role", "missing-env", "missing-name");
IJobUpdateDetails update1 = makeAndSave(makeKey("u1"), ROLLING_BACK, 123L, 123L, true);
IJobUpdateDetails update2 = makeAndSave(makeKey("u2"), ABORTED, 124L, 124L, true);
IJobUpdateDetails update3 = makeAndSave(makeKey("u3"), ROLLED_BACK, 125L, 125L, true);
IJobUpdateDetails update4 = makeAndSave(makeKey("u4"), FAILED, 126L, 126L, true);
IJobUpdateDetails update5 = makeAndSave(makeKey(job2, "u5"), ERROR, 123L, 123L, true);
IJobUpdateDetails update6 = makeAndSave(makeKey(job2, "u6"), FAILED, 125L, 125L, true);
IJobUpdateDetails update7 = makeAndSave(makeKey(job2, "u7"), ROLLING_FORWARD, 126L, 126L, true);
makeAndSave(makeKey(missingKey, "u1"), ERROR, 123L, 123L, false);
long pruningThreshold = 120;
// Only orphaned updates pruned.
pruneHistory(3, pruningThreshold);
assertRetainedUpdates(update1, update2, update3, update4, update5, update6, update7);
// 2 updates pruned.
pruneHistory(2, pruningThreshold);
assertRetainedUpdates(update1, update3, update4, update5, update6, update7);
// 3 updates pruned.
pruneHistory(1, pruningThreshold);
assertRetainedUpdates(update1, update4, update6, update7);
// The oldest update is pruned.
pruneHistory(1, 126);
assertRetainedUpdates(update1, update4, update7);
// Nothing survives the 0 per job count.
pruneHistory(0, pruningThreshold);
assertRetainedUpdates(update1, update7);
}
private void pruneHistory(int retainCount, long pruningThresholdMs) {
FakeClock clock = new FakeClock();
clock.setNowMillis(100 + pruningThresholdMs);
JobUpdateHistoryPruner pruner = new JobUpdateHistoryPruner(
clock,
storage,
new HistoryPrunerSettings(
Amount.of(1L, Time.DAYS),
Amount.of(100L, Time.MILLISECONDS),
retainCount),
new FakeStatsProvider());
pruner.runForTest();
}
private void assertRetainedUpdates(IJobUpdateDetails... updates) {
storage.read(store -> {
assertEquals(
Stream.of(updates).map(u -> u.getUpdate().getSummary().getKey())
.collect(Collectors.toSet()),
store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
.map(u -> u.getUpdate().getSummary().getKey())
.collect(Collectors.toSet()));
return null;
});
}
private static IJobUpdateKey makeKey(String id) {
return makeKey(JOB, id);
}
private static IJobUpdateKey makeKey(IJobKey job, String id) {
return IJobUpdateKey.build(new JobUpdateKey().setJob(job.newBuilder()).setId(id));
}
private IJobUpdateDetails makeAndSave(
IJobUpdateKey key,
JobUpdateStatus status,
long createdMs,
long lastMs,
boolean hasTasks) {
IJobUpdateDetails update = IJobUpdateDetails.build(new JobUpdateDetails()
.setUpdateEvents(ImmutableList.of(
new JobUpdateEvent(status, lastMs)
.setUser("user")
.setMessage("message")
))
.setInstanceEvents(ImmutableList.of())
.setUpdate(new JobUpdate()
.setInstructions(new JobUpdateInstructions()
.setDesiredState(new InstanceTaskConfig()
.setTask(new TaskConfig())
.setInstances(ImmutableSet.of(new Range()))))
.setSummary(new JobUpdateSummary()
.setKey(key.newBuilder())
.setState(new JobUpdateState()
.setCreatedTimestampMs(createdMs)
.setLastModifiedTimestampMs(lastMs)
.setStatus(status)))));
storage.write((NoResult.Quiet) storeProvider -> {
JobUpdateStore.Mutable store = storeProvider.getJobUpdateStore();
store.saveJobUpdate(update.getUpdate());
update.getUpdateEvents().forEach(event -> store.saveJobUpdateEvent(key, event));
update.getInstanceEvents().forEach(event -> store.saveJobInstanceUpdateEvent(key, event));
TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
if (hasTasks) {
taskStore.saveTasks(ImmutableSet.of(makeTask("test", key.getJob())));
}
});
return update;
}
}