blob: e7612383283a690f02efebc934abd9ede2378ac6 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage.log;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.stats.SlidingStats;
import org.apache.aurora.common.stats.SlidingStats.Timeable;
import org.apache.aurora.common.util.BuildInfo;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.QuotaConfiguration;
import org.apache.aurora.gen.storage.SaveCronJob;
import org.apache.aurora.gen.storage.SaveFrameworkId;
import org.apache.aurora.gen.storage.SaveHostAttributes;
import org.apache.aurora.gen.storage.SaveHostMaintenanceRequest;
import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
import org.apache.aurora.gen.storage.SaveJobUpdate;
import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
import org.apache.aurora.gen.storage.SaveQuota;
import org.apache.aurora.gen.storage.SaveTasks;
import org.apache.aurora.gen.storage.SchedulerMetadata;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.StoredCronJob;
import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.Snapshotter;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
/**
* Snapshot store implementation that delegates to underlying snapshot stores by
* extracting/applying fields in a snapshot thrift struct.
*/
public class SnapshotterImpl implements Snapshotter {
@VisibleForTesting
static final String SNAPSHOT_SAVE = "snapshot_save_";
@VisibleForTesting
static final String SNAPSHOT_RESTORE = "snapshot_restore_";
private static final Logger LOG = LoggerFactory.getLogger(SnapshotterImpl.class);
private static final String HOST_ATTRIBUTES_FIELD = "hosts";
private static final String QUOTA_FIELD = "quota";
private static final String TASK_FIELD = "tasks";
private static final String CRON_FIELD = "crons";
private static final String JOB_UPDATE_FIELD = "job_updates";
private static final String SCHEDULER_METADATA_FIELD = "scheduler_metadata";
private static final String HOST_MAINTENANCE_REQUESTS_FIELDS = "host_maintenance_requests";
@VisibleForTesting
Set<String> snapshotFieldNames() {
return snapshotFields.stream()
.map(SnapshotField::getName)
.collect(Collectors.toSet());
}
private final List<SnapshotField> snapshotFields = ImmutableList.of(
new SnapshotField() {
@Override
String getName() {
return HOST_ATTRIBUTES_FIELD;
}
@Override
void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
snapshot.setHostAttributes(
IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes()));
}
@Override
Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.getHostAttributesSize() > 0) {
return snapshot.getHostAttributes().stream()
.map(attributes -> Op.saveHostAttributes(
new SaveHostAttributes().setHostAttributes(attributes)));
}
return Stream.empty();
}
},
new SnapshotField() {
@Override
String getName() {
return TASK_FIELD;
}
@Override
void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
snapshot.setTasks(
IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
}
@Override
Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.getTasksSize() > 0) {
return Stream.of(Op.saveTasks(new SaveTasks().setTasks(snapshot.getTasks())));
}
return Stream.empty();
}
},
new SnapshotField() {
@Override
String getName() {
return CRON_FIELD;
}
@Override
void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder();
for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) {
jobs.add(new StoredCronJob(config.newBuilder()));
}
snapshot.setCronJobs(jobs.build());
}
@Override
Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.getCronJobsSize() > 0) {
return snapshot.getCronJobs().stream()
.map(job -> Op.saveCronJob(
new SaveCronJob().setJobConfig(job.getJobConfiguration())));
}
return Stream.empty();
}
},
new SnapshotField() {
@Override
String getName() {
return SCHEDULER_METADATA_FIELD;
}
@Override
void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
// SchedulerMetadata is updated outside of the static list of SnapshotFields
}
@Override
Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.isSetSchedulerMetadata()
&& snapshot.getSchedulerMetadata().isSetFrameworkId()) {
// No delete necessary here since this is a single value.
return Stream.of(Op.saveFrameworkId(
new SaveFrameworkId().setId(snapshot.getSchedulerMetadata().getFrameworkId())));
}
return Stream.empty();
}
},
new SnapshotField() {
@Override
String getName() {
return QUOTA_FIELD;
}
@Override
void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
for (Map.Entry<String, IResourceAggregate> entry
: store.getQuotaStore().fetchQuotas().entrySet()) {
quotas.add(new QuotaConfiguration(entry.getKey(), entry.getValue().newBuilder()));
}
snapshot.setQuotaConfigurations(quotas.build());
}
@Override
Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.getQuotaConfigurationsSize() > 0) {
return snapshot.getQuotaConfigurations().stream()
.map(quota -> Op.saveQuota(new SaveQuota()
.setRole(quota.getRole())
.setQuota(quota.getQuota())));
}
return Stream.empty();
}
},
new SnapshotField() {
@Override
String getName() {
return JOB_UPDATE_FIELD;
}
@Override
void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
snapshot.setJobUpdateDetails(
store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
.map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder()))
.collect(Collectors.toSet()));
}
@Override
Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.getJobUpdateDetailsSize() > 0) {
return snapshot.getJobUpdateDetails().stream()
.flatMap(details -> {
Stream<Op> parent = Stream.of(Op.saveJobUpdate(
new SaveJobUpdate().setJobUpdate(details.getDetails().getUpdate())));
Stream<Op> jobEvents;
if (details.getDetails().getUpdateEventsSize() > 0) {
jobEvents = details.getDetails().getUpdateEvents().stream()
.map(event -> Op.saveJobUpdateEvent(
new SaveJobUpdateEvent()
.setKey(details.getDetails().getUpdate().getSummary().getKey())
.setEvent(event)));
} else {
jobEvents = Stream.empty();
}
Stream<Op> instanceEvents;
if (details.getDetails().getInstanceEventsSize() > 0) {
instanceEvents = details.getDetails().getInstanceEvents().stream()
.map(event -> Op.saveJobInstanceUpdateEvent(
new SaveJobInstanceUpdateEvent()
.setKey(details.getDetails().getUpdate().getSummary().getKey())
.setEvent(event)));
} else {
instanceEvents = Stream.empty();
}
return Streams.concat(parent, jobEvents, instanceEvents);
});
}
return Stream.empty();
}
},
new SnapshotField() {
@Override
String getName() {
return HOST_MAINTENANCE_REQUESTS_FIELDS;
}
@Override
void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot) {
snapshot.setHostMaintenanceRequests(
IHostMaintenanceRequest.toBuildersSet(
storeProvider.getHostMaintenanceStore().getHostMaintenanceRequests()));
}
@Override
Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.getHostMaintenanceRequestsSize() > 0) {
return snapshot.getHostMaintenanceRequests().stream()
.map(request -> Op.saveHostMaintenanceRequest(
new SaveHostMaintenanceRequest().setHostMaintenanceRequest(request)));
}
return Stream.empty();
}
}
);
private final BuildInfo buildInfo;
private final Clock clock;
@Inject
public SnapshotterImpl(BuildInfo buildInfo, Clock clock) {
this.buildInfo = requireNonNull(buildInfo);
this.clock = requireNonNull(clock);
}
private Snapshot createSnapshot(StoreProvider storeProvider) {
Snapshot snapshot = new Snapshot();
// Capture timestamp to signify the beginning of a snapshot operation, apply after in case
// one of the field closures is mean and tries to apply a timestamp.
long timestamp = clock.nowMillis();
for (SnapshotField field : snapshotFields) {
field.save(storeProvider, snapshot);
}
SchedulerMetadata metadata = new SchedulerMetadata()
.setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orElse(null))
.setDetails(buildInfo.getProperties());
snapshot.setSchedulerMetadata(metadata);
snapshot.setTimestamp(timestamp);
return snapshot;
}
@Timed("snapshot_create")
@Override
public Snapshot from(StoreProvider stores) {
return createSnapshot(stores);
}
@Timed("snapshot_apply")
@Override
public Stream<Op> asStream(Snapshot snapshot) {
requireNonNull(snapshot);
LOG.info("Restoring snapshot.");
return snapshotFields.stream()
.flatMap(field -> field.streamFrom(snapshot));
}
abstract class SnapshotField {
abstract String getName();
abstract void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
abstract Stream<Op> doStreamFrom(Snapshot snapshot);
void save(StoreProvider storeProvider, Snapshot snapshot) {
stats.getUnchecked(SNAPSHOT_SAVE + getName())
.time((Timeable.NoResult.Quiet) () -> saveToSnapshot(storeProvider, snapshot));
}
Stream<Op> streamFrom(Snapshot snapshot) {
return stats.getUnchecked(SNAPSHOT_RESTORE + getName()).time(() -> doStreamFrom(snapshot));
}
}
private final LoadingCache<String, SlidingStats> stats = CacheBuilder.newBuilder().build(
new CacheLoader<String, SlidingStats>() {
@Override
public SlidingStats load(String name) throws Exception {
return new SlidingStats(name, "nanos");
}
});
}