blob: d6b42e8b919864fdde22dd5ec193717d25c94ec9 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage.mem;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import org.apache.aurora.common.base.MorePreconditions;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.gen.JobInstanceUpdateEvent;
import org.apache.aurora.gen.JobUpdateDetails;
import org.apache.aurora.gen.JobUpdateEvent;
import org.apache.aurora.gen.JobUpdateState;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
import static java.util.Objects.requireNonNull;
public class MemJobUpdateStore implements JobUpdateStore.Mutable {
@VisibleForTesting
static final String UPDATE_STORE_SIZE = "mem_storage_update_size";
private static final Ordering<IJobUpdateDetails> REVERSE_LAST_MODIFIED_ORDER = Ordering.natural()
.reverse()
.onResultOf(u -> u.getUpdate().getSummary().getState().getLastModifiedTimestampMs());
private final Map<IJobUpdateKey, IJobUpdateDetails> updates = Maps.newConcurrentMap();
@Inject
MemJobUpdateStore(StatsProvider statsProvider) {
statsProvider.makeGauge(UPDATE_STORE_SIZE, updates::size);
}
@Timed("job_update_store_fetch_details_query")
@Override
public synchronized List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
return performQuery(query).collect(Collectors.toList());
}
@Timed("job_update_store_fetch_details")
@Override
public synchronized Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
return Optional.ofNullable(updates.get(key));
}
private static void validateInstructions(IJobUpdateInstructions instructions) {
if (!instructions.isSetDesiredState() && instructions.getInitialState().isEmpty()) {
throw new IllegalArgumentException(
"Missing both initial and desired states. At least one is required.");
}
if (!instructions.getInitialState().isEmpty()) {
if (instructions.getInitialState().stream().anyMatch(t -> t.getTask() == null)) {
throw new NullPointerException("Invalid initial instance state.");
}
Preconditions.checkArgument(
instructions.getInitialState().stream().noneMatch(t -> t.getInstances().isEmpty()),
"Invalid intial instance state ranges.");
}
if (instructions.getDesiredState() != null) {
MorePreconditions.checkNotBlank(instructions.getDesiredState().getInstances());
Preconditions.checkNotNull(instructions.getDesiredState().getTask());
}
}
@Timed("job_update_store_save_update")
@Override
public synchronized void saveJobUpdate(IJobUpdate update) {
requireNonNull(update);
validateInstructions(update.getInstructions());
JobUpdateDetails mutable = new JobUpdateDetails()
.setUpdate(update.newBuilder())
.setUpdateEvents(ImmutableList.of())
.setInstanceEvents(ImmutableList.of());
mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
updates.put(update.getSummary().getKey(), IJobUpdateDetails.build(mutable));
}
private static final Ordering<JobUpdateEvent> EVENT_ORDERING = Ordering.natural()
.onResultOf(JobUpdateEvent::getTimestampMs);
@Timed("job_update_store_save_event")
@Override
public synchronized void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
IJobUpdateDetails update = updates.get(key);
if (update == null) {
throw new StorageException("Update not found: " + key);
}
JobUpdateDetails mutable = update.newBuilder();
mutable.addToUpdateEvents(event.newBuilder());
mutable.setUpdateEvents(EVENT_ORDERING.sortedCopy(mutable.getUpdateEvents()));
mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
updates.put(key, IJobUpdateDetails.build(mutable));
}
private static final Ordering<JobInstanceUpdateEvent> INSTANCE_EVENT_ORDERING = Ordering.natural()
.onResultOf(JobInstanceUpdateEvent::getTimestampMs);
@Timed("job_update_store_save_instance_event")
@Override
public synchronized void saveJobInstanceUpdateEvent(
IJobUpdateKey key,
IJobInstanceUpdateEvent event) {
IJobUpdateDetails update = updates.get(key);
if (update == null) {
throw new StorageException("Update not found: " + key);
}
JobUpdateDetails mutable = update.newBuilder();
mutable.addToInstanceEvents(event.newBuilder());
mutable.setInstanceEvents(INSTANCE_EVENT_ORDERING.sortedCopy(mutable.getInstanceEvents()));
mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
updates.put(key, IJobUpdateDetails.build(mutable));
}
@Timed("job_update_store_delete_updates")
@Override
public synchronized void removeJobUpdates(Set<IJobUpdateKey> key) {
requireNonNull(key);
updates.keySet().removeAll(key);
}
@Timed("job_update_store_delete_all")
@Override
public synchronized void deleteAllUpdates() {
updates.clear();
}
private static JobUpdateState synthesizeUpdateState(JobUpdateDetails update) {
JobUpdateState state = new JobUpdateState();
JobUpdateEvent firstEvent = Iterables.getFirst(update.getUpdateEvents(), null);
if (firstEvent != null) {
state.setCreatedTimestampMs(firstEvent.getTimestampMs());
}
JobUpdateEvent lastEvent = Iterables.getLast(update.getUpdateEvents(), null);
if (lastEvent != null) {
state.setStatus(lastEvent.getStatus());
state.setLastModifiedTimestampMs(lastEvent.getTimestampMs());
}
JobInstanceUpdateEvent lastInstanceEvent = Iterables.getLast(update.getInstanceEvents(), null);
if (lastInstanceEvent != null) {
state.setLastModifiedTimestampMs(
Longs.max(state.getLastModifiedTimestampMs(), lastInstanceEvent.getTimestampMs()));
}
return state;
}
private Stream<IJobUpdateDetails> performQuery(IJobUpdateQuery query) {
Predicate<IJobUpdateDetails> filter = u -> true;
if (query.getRole() != null) {
filter = filter.and(
u -> u.getUpdate().getSummary().getKey().getJob().getRole().equals(query.getRole()));
}
if (query.getKey() != null) {
filter = filter.and(u -> u.getUpdate().getSummary().getKey().equals(query.getKey()));
}
if (query.getJobKey() != null) {
filter = filter.and(
u -> u.getUpdate().getSummary().getKey().getJob().equals(query.getJobKey()));
}
if (query.getUser() != null) {
filter = filter.and(u -> u.getUpdate().getSummary().getUser().equals(query.getUser()));
}
if (query.getUpdateStatuses() != null && !query.getUpdateStatuses().isEmpty()) {
filter = filter.and(u -> query.getUpdateStatuses()
.contains(u.getUpdate().getSummary().getState().getStatus()));
}
// TODO(wfarner): Modification time is not a stable ordering for pagination, but we use it as
// such here. The behavior is carried over from DbJobupdateStore; determine if it is desired.
Stream<IJobUpdateDetails> matches = updates.values().stream()
.filter(filter)
.sorted(REVERSE_LAST_MODIFIED_ORDER)
.skip(query.getOffset());
if (query.getLimit() > 0) {
matches = matches.limit(query.getLimit());
}
return matches;
}
}