blob: ea5943356e1964fa42bcb8192659445d25fd1e60 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.pruning;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.AbstractScheduledService;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.JobUpdateQuery;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
import static org.apache.aurora.scheduler.storage.JobUpdateStore.TERMINAL_STATES;
/**
* Prunes per-job update history on a periodic basis.
*/
class JobUpdateHistoryPruner extends AbstractScheduledService {
private static final Logger LOG = LoggerFactory.getLogger(JobUpdateHistoryPruner.class);
@VisibleForTesting
static final String JOB_UPDATES_PRUNED = "job_updates_pruned";
private final Clock clock;
private final Storage storage;
private final HistoryPrunerSettings settings;
private final AtomicLong prunedUpdatesCount;
static class HistoryPrunerSettings {
private final Amount<Long, Time> pruneInterval;
private final Amount<Long, Time> maxHistorySize;
private final int maxUpdatesPerJob;
HistoryPrunerSettings(
Amount<Long, Time> pruneInterval,
Amount<Long, Time> maxHistorySize,
int maxUpdatesPerJob) {
this.pruneInterval = requireNonNull(pruneInterval);
this.maxHistorySize = requireNonNull(maxHistorySize);
this.maxUpdatesPerJob = maxUpdatesPerJob;
}
}
@Inject
JobUpdateHistoryPruner(
Clock clock,
Storage storage,
HistoryPrunerSettings settings,
StatsProvider statsProvider) {
this.clock = requireNonNull(clock);
this.storage = requireNonNull(storage);
this.settings = requireNonNull(settings);
this.prunedUpdatesCount = statsProvider.makeCounter(JOB_UPDATES_PRUNED);
}
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(
settings.pruneInterval.as(Time.MILLISECONDS),
settings.pruneInterval.as(Time.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
@VisibleForTesting
void runForTest() {
runOneIteration();
}
@Timed("job_update_store_prune_history")
@Override
protected void runOneIteration() {
storage.write((NoResult.Quiet) storeProvider -> {
List<IJobUpdateSummary> completedUpdates = storeProvider.getJobUpdateStore()
.fetchJobUpdates(IJobUpdateQuery.build(
new JobUpdateQuery().setUpdateStatuses(TERMINAL_STATES)))
.stream()
.map(u -> u.getUpdate().getSummary())
.collect(Collectors.toList());
long cutoff = clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS);
Predicate<IJobUpdateSummary> expiredFilter =
s -> s.getState().getCreatedTimestampMs() < cutoff;
// Set up a predicate to detect updates that have no tasks left in the store.
Set<IJobKey> currentJobs = storeProvider.getTaskStore().getJobKeys();
Predicate<IJobUpdateSummary> orphanFilter = u -> !currentJobs.contains(u.getKey().getJob());
ImmutableSet.Builder<IJobUpdateKey> pruneBuilder = ImmutableSet.builder();
// Gather updates based on time threshold or ones that have been orphaned.
pruneBuilder.addAll(completedUpdates
.stream()
.filter(expiredFilter.or(orphanFilter))
.map(IJobUpdateSummary::getKey)
.collect(Collectors.toList()));
Multimap<IJobKey, IJobUpdateSummary> updatesByJob = Multimaps.index(
// Avoid counting to-be-removed expired updates.
completedUpdates.stream().filter(expiredFilter.negate()).iterator(),
s -> s.getKey().getJob());
updatesByJob.asMap().values().forEach(updates -> {
if (updates.size() > settings.maxUpdatesPerJob) {
Ordering<IJobUpdateSummary> creationOrder = Ordering.natural()
.onResultOf(s -> s.getState().getCreatedTimestampMs());
pruneBuilder.addAll(creationOrder
.leastOf(updates, updates.size() - settings.maxUpdatesPerJob)
.stream()
.map(IJobUpdateSummary::getKey)
.iterator());
}
});
Set<IJobUpdateKey> pruned = pruneBuilder.build();
if (!pruned.isEmpty()) {
storeProvider.getJobUpdateStore().removeJobUpdates(pruned);
}
prunedUpdatesCount.addAndGet(pruned.size());
LOG.info(pruned.isEmpty()
? "No job update history to prune."
: "Pruned job update history: " + Joiner.on(",").join(pruned));
});
}
}