blob: 06618affb1d1f5c7a1cc4a0470523169a4f1ed3e [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.pruning;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.eventbus.Subscribe;
import org.apache.aurora.common.application.Lifecycle;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.apiConstants;
import org.apache.aurora.scheduler.BatchWorker;
import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError;
import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
/**
* Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
* transitioning into one of the inactive states.
*/
public class TaskHistoryPruner implements EventSubscriber {
private static final Logger LOG = LoggerFactory.getLogger(TaskHistoryPruner.class);
private static final String FATAL_ERROR_FORMAT =
"Unexpected problem pruning task history for %s. Triggering shutdown";
@VisibleForTesting
static final String TASKS_PRUNED = "tasks_pruned";
private final ScheduledExecutorService executor;
private final StateManager stateManager;
private final Clock clock;
private final HistoryPrunerSettings settings;
private final Storage storage;
private final Lifecycle lifecycle;
private final TaskEventBatchWorker batchWorker;
private final AtomicLong prunedTasksCount;
private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
@Override
public boolean apply(IScheduledTask task) {
return Tasks.getLatestEvent(task).getTimestamp()
<= clock.nowMillis() - settings.minRetentionThresholdMillis;
}
};
static class HistoryPrunerSettings {
private final long pruneThresholdMillis;
private final long minRetentionThresholdMillis;
private final int perJobHistoryGoal;
HistoryPrunerSettings(
Amount<Long, Time> inactivePruneThreshold,
Amount<Long, Time> minRetentionThreshold,
int perJobHistoryGoal) {
this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
this.minRetentionThresholdMillis = minRetentionThreshold.as(Time.MILLISECONDS);
this.perJobHistoryGoal = perJobHistoryGoal;
}
}
@Inject
TaskHistoryPruner(
@AsyncExecutor ScheduledExecutorService executor,
StateManager stateManager,
Clock clock,
HistoryPrunerSettings settings,
Storage storage,
Lifecycle lifecycle,
TaskEventBatchWorker batchWorker,
StatsProvider statsProvider) {
this.executor = requireNonNull(executor);
this.stateManager = requireNonNull(stateManager);
this.clock = requireNonNull(clock);
this.settings = requireNonNull(settings);
this.storage = requireNonNull(storage);
this.lifecycle = requireNonNull(lifecycle);
this.batchWorker = requireNonNull(batchWorker);
this.prunedTasksCount = statsProvider.makeCounter(TASKS_PRUNED);
}
@VisibleForTesting
long calculateTimeout(long taskEventTimestampMillis) {
return Math.max(
settings.minRetentionThresholdMillis,
settings.pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis));
}
/**
* When triggered, records an inactive task state change.
*
* @param change Event when a task changes state.
*/
@Subscribe
public void recordStateChange(TaskStateChange change) {
if (Tasks.isTerminated(change.getNewState())) {
long timeoutBasis = change.isTransition()
? clock.nowMillis()
: Iterables.getLast(change.getTask().getTaskEvents()).getTimestamp();
registerInactiveTask(
Tasks.getJob(change.getTask()),
change.getTaskId(),
calculateTimeout(timeoutBasis));
}
}
private void deleteTasks(final Set<String> taskIds) {
LOG.debug("Pruning inactive tasks {}", taskIds);
batchWorker.execute(storeProvider -> {
stateManager.deleteTasks(storeProvider, taskIds);
return BatchWorker.NO_RESULT;
});
prunedTasksCount.addAndGet(taskIds.size());
}
@VisibleForTesting
static Query.Builder jobHistoryQuery(IJobKey jobKey) {
return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
}
private void registerInactiveTask(
final IJobKey jobKey,
final String taskId,
long timeRemaining) {
LOG.debug("Prune task {} in {} ms.", taskId, timeRemaining);
executor.schedule(
shutdownOnError(
lifecycle,
LOG,
String.format(FATAL_ERROR_FORMAT, "task: " + taskId),
() -> {
LOG.info("Pruning expired inactive task " + taskId);
deleteTasks(ImmutableSet.of(taskId));
}),
timeRemaining,
TimeUnit.MILLISECONDS);
executor.execute(
shutdownOnError(
lifecycle,
LOG,
String.format(FATAL_ERROR_FORMAT, "job: " + jobKey),
() -> {
Iterable<IScheduledTask> inactiveTasks =
Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
int numInactiveTasks = Iterables.size(inactiveTasks);
int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal;
if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) {
Set<String> toPrune = FluentIterable
.from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
.filter(safeToDelete)
.limit(tasksToPrune)
.transform(Tasks::id)
.toSet();
if (!toPrune.isEmpty()) {
deleteTasks(toPrune);
}
}
}));
}
}