blob: 5b10350b295e2dbff7c9237cf6d35aebd3fbdb18 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage.mem;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
/**
* An in-memory task store.
*/
class MemTaskStore implements TaskStore.Mutable {
private static final Logger LOG = LoggerFactory.getLogger(MemTaskStore.class);
/**
* When true, enable snapshot deflation.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.PARAMETER})
@Qualifier
public @interface SlowQueryThreshold { }
private final long slowQueryThresholdNanos;
private static final Function<Query.Builder, Optional<Set<IJobKey>>> QUERY_TO_JOB_KEY =
JobKeys::from;
private static final Function<Query.Builder, Optional<Set<String>>> QUERY_TO_SLAVE_HOST =
query -> query.get().getSlaveHosts().isEmpty()
? Optional.empty()
: Optional.of(query.get().getSlaveHosts());
// Since this class operates under the API and umbrella of {@link Storage}, it is expected to be
// thread-safe but not necessarily strongly-consistent unless the externally-controlled storage
// lock is secured. To adhere to that, these data structures are individually thread-safe, but
// we don't lock across them because of the relaxed consistency guarantees.
// Note that this behavior makes it possible to receive query results that are not sane,
// specifically when a secondary key value is changed. In other words, we currently don't always
// support the invariant that a query by slave host yields a result with all tasks matching that
// slave host. This is deemed acceptable due to the fact that secondary key values are rarely
// mutated in practice, and mutated in ways that are not impacted by this behavior.
private final Map<String, Task> tasks = Maps.newConcurrentMap();
private final SecondaryIndex<IJobKey> jobIndex;
private final List<SecondaryIndex<?>> secondaryIndices;
// An interner is used here to collapse equivalent TaskConfig instances into canonical instances.
// Ideally this would fall out of the object hierarchy (TaskConfig being associated with the job
// rather than the task), but we intuit this detail here for performance reasons.
private final Interner<TaskConfig, String> configInterner = new Interner<>();
private final AtomicLong taskQueriesById;
private final AtomicLong taskQueriesAll;
@Inject
MemTaskStore(
StatsProvider statsProvider,
@SlowQueryThreshold Amount<Long, Time> slowQueryThreshold) {
jobIndex = new SecondaryIndex<>(Tasks::getJob, QUERY_TO_JOB_KEY, statsProvider, "job");
secondaryIndices = ImmutableList.of(
jobIndex,
new SecondaryIndex<>(
Tasks::scheduledToSlaveHost,
QUERY_TO_SLAVE_HOST,
statsProvider,
"host"));
slowQueryThresholdNanos = slowQueryThreshold.as(Time.NANOSECONDS);
taskQueriesById = statsProvider.makeCounter("task_queries_by_id");
taskQueriesAll = statsProvider.makeCounter("task_queries_all");
}
@Timed("mem_storage_fetch_task")
@Override
public Optional<IScheduledTask> fetchTask(String taskId) {
requireNonNull(taskId);
return Optional.ofNullable(tasks.get(taskId)).map(t -> t.storedTask);
}
@Timed("mem_storage_fetch_tasks")
@Override
public Collection<IScheduledTask> fetchTasks(Query.Builder query) {
requireNonNull(query);
long start = System.nanoTime();
Collection<IScheduledTask> result = matches(query);
long durationNanos = System.nanoTime() - start;
boolean infoLevel = durationNanos >= slowQueryThresholdNanos;
long time = Amount.of(durationNanos, Time.NANOSECONDS).as(Time.MILLISECONDS);
String message = "Query took {} ms: {}";
if (infoLevel) {
LOG.info(message, time, query.get());
} else if (LOG.isDebugEnabled()) {
LOG.debug(message, time, query.get());
}
return result;
}
@Timed("mem_storage_get_job_keys")
@Override
public Set<IJobKey> getJobKeys() {
return jobIndex.keySet();
}
private final Function<IScheduledTask, Task> toTask = task -> new Task(task, configInterner);
@Timed("mem_storage_save_tasks")
@Override
public void saveTasks(Set<IScheduledTask> newTasks) {
requireNonNull(newTasks);
Preconditions.checkState(Tasks.ids(newTasks).size() == newTasks.size(),
"Proposed new tasks would create task ID collision.");
Iterable<Task> canonicalized = Iterables.transform(newTasks, toTask);
tasks.putAll(Maps.uniqueIndex(canonicalized, task -> Tasks.id(task.storedTask)));
for (SecondaryIndex<?> index : secondaryIndices) {
index.insert(Iterables.transform(canonicalized, task -> task.storedTask));
}
}
@Timed("mem_storage_delete_all_tasks")
@Override
public void deleteAllTasks() {
tasks.clear();
for (SecondaryIndex<?> index : secondaryIndices) {
index.clear();
}
configInterner.clear();
}
@Timed("mem_storage_delete_tasks")
@Override
public void deleteTasks(Set<String> taskIds) {
requireNonNull(taskIds);
for (String id : taskIds) {
Task removed = tasks.remove(id);
if (removed != null) {
for (SecondaryIndex<?> index : secondaryIndices) {
index.remove(removed.storedTask);
}
configInterner.removeAssociation(
removed.storedTask.getAssignedTask().getTask().newBuilder(),
id);
}
}
}
@Timed("mem_storage_mutate_task")
@Override
public Optional<IScheduledTask> mutateTask(
String taskId,
Function<IScheduledTask, IScheduledTask> mutator) {
return fetchTask(taskId).map(original -> {
IScheduledTask maybeMutated = mutator.apply(original);
requireNonNull(maybeMutated);
if (!original.equals(maybeMutated)) {
Preconditions.checkState(
Tasks.id(original).equals(Tasks.id(maybeMutated)),
"A task's ID may not be mutated.");
tasks.put(Tasks.id(maybeMutated), toTask.apply(maybeMutated));
for (SecondaryIndex<?> index : secondaryIndices) {
index.replace(original, maybeMutated);
}
}
return maybeMutated;
});
}
private Collection<IScheduledTask> fromIdIndex(
Iterable<String> taskIds,
Predicate<IScheduledTask> filter) {
Collection<IScheduledTask> result = new ArrayDeque<>();
for (String id : taskIds) {
Task match = tasks.get(id);
if (match != null && filter.apply(match.storedTask)) {
result.add(match.storedTask);
}
}
return result;
}
private Collection<IScheduledTask> matches(Query.Builder query) {
Predicate<IScheduledTask> filter = Util.queryFilter(query);
if (query.get().getTaskIds().isEmpty()) {
for (SecondaryIndex<?> index : secondaryIndices) {
Optional<Iterable<String>> indexMatch = index.getMatches(query);
if (indexMatch.isPresent()) {
// Note: we could leverage multiple indexes here if the query applies to them, by
// choosing to intersect the results. Given current indexes and query profile, this is
// unlikely to offer much improvement, though.
return fromIdIndex(indexMatch.get(), filter);
}
}
// No indices match, fall back to a full scan.
taskQueriesAll.incrementAndGet();
Collection<IScheduledTask> result = new ArrayDeque<>();
for (Task task : tasks.values()) {
if (filter.test(task.storedTask)) {
result.add(task.storedTask);
}
}
return Collections.unmodifiableCollection(result);
} else {
taskQueriesById.incrementAndGet();
return fromIdIndex(query.get().getTaskIds(), filter);
}
}
private static class Task {
private final IScheduledTask storedTask;
Task(IScheduledTask storedTask, Interner<TaskConfig, String> interner) {
interner.removeAssociation(
storedTask.getAssignedTask().getTask().newBuilder(),
Tasks.id(storedTask));
TaskConfig canonical = interner.addAssociation(
storedTask.getAssignedTask().getTask().newBuilder(),
Tasks.id(storedTask));
ScheduledTask builder = storedTask.newBuilder();
builder.getAssignedTask().setTask(canonical);
this.storedTask = IScheduledTask.build(builder);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof Task)) {
return false;
}
Task other = (Task) o;
return storedTask.equals(other.storedTask);
}
@Override
public int hashCode() {
return storedTask.hashCode();
}
}
@VisibleForTesting
static String getIndexSizeStatName(String name) {
return "task_store_index_" + name + "_items";
}
/**
* A non-unique secondary index on the task store. Maps a custom key type to a set of task IDs.
*
* @param <K> Key type.
*/
private static class SecondaryIndex<K> {
private final Multimap<K, String> index =
Multimaps.synchronizedSetMultimap(HashMultimap.create());
private final Function<IScheduledTask, K> indexer;
private final Function<Query.Builder, Optional<Set<K>>> queryExtractor;
private final AtomicLong hitCount;
/**
* Creates a secondary index that will extract keys from tasks using the provided indexer.
*
* @param indexer Indexing function.
* @param queryExtractor Function to extract the keys relevant to a query.
* @param statsProvider Stats system to export metrics to.
* @param name Name to use in stats keys.
*/
SecondaryIndex(
Function<IScheduledTask, K> indexer,
Function<Query.Builder, Optional<Set<K>>> queryExtractor,
StatsProvider statsProvider,
String name) {
this.indexer = indexer;
this.queryExtractor = queryExtractor;
this.hitCount = statsProvider.makeCounter("task_queries_by_" + name);
statsProvider.makeGauge(
getIndexSizeStatName(name),
new Supplier<Number>() {
@Override
public Number get() {
return index.size();
}
});
}
Set<K> keySet() {
return ImmutableSet.copyOf(index.keySet());
}
void insert(Iterable<IScheduledTask> tasks) {
for (IScheduledTask task : tasks) {
insert(task);
}
}
void insert(IScheduledTask task) {
K key = indexer.apply(task);
if (key != null) {
index.put(key, Tasks.id(task));
}
}
void clear() {
index.clear();
}
void remove(IScheduledTask task) {
K key = indexer.apply(task);
if (key != null) {
index.remove(key, Tasks.id(task));
}
}
void replace(IScheduledTask old, IScheduledTask replacement) {
synchronized (index) {
remove(old);
insert(replacement);
}
}
private final Function<Set<K>, Iterable<String>> lookup =
new Function<Set<K>, Iterable<String>>() {
@Override
public Iterable<String> apply(Set<K> keys) {
hitCount.incrementAndGet();
Collection<String> matches = new ArrayDeque<>();
synchronized (index) {
for (K key : keys) {
matches.addAll(index.get(key));
}
}
return matches;
}
};
Optional<Iterable<String>> getMatches(Query.Builder query) {
return queryExtractor.apply(query).map(lookup);
}
}
}