blob: cbe8f02feb1aa7dde80d0cad79a18a4c06ec8e3e [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage.durability;
import java.util.stream.Stream;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
import org.apache.aurora.gen.storage.SaveQuota;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class Loader {
private static final Logger LOG = LoggerFactory.getLogger(Loader.class);
private Loader() {
// Utility class.
}
/**
* Loads a sequence of storage operations into the provided stores, applying backfills.
*
* @param stores Stores to populate.
* @param backfill Backfill mechanism to use.
* @param edits Edits to apply.
*/
public static void load(
MutableStoreProvider stores,
ThriftBackfill backfill,
Stream<Edit> edits) {
edits.forEach(edit -> load(stores, backfill, edit));
}
private static void load(MutableStoreProvider stores, ThriftBackfill backfill, Edit edit) {
if (edit.isDeleteAll()) {
LOG.info("Resetting storage");
stores.getCronJobStore().deleteJobs();
stores.getUnsafeTaskStore().deleteAllTasks();
stores.getQuotaStore().deleteQuotas();
stores.getAttributeStore().deleteHostAttributes();
stores.getJobUpdateStore().deleteAllUpdates();
stores.getHostMaintenanceStore().deleteHostMaintenanceRequests();
return;
}
Op op = edit.getOp();
switch (op.getSetField()) {
case SAVE_FRAMEWORK_ID:
stores.getSchedulerStore().saveFrameworkId(op.getSaveFrameworkId().getId());
break;
case SAVE_CRON_JOB:
stores.getCronJobStore().saveAcceptedJob(
backfill.backfillJobConfiguration(op.getSaveCronJob().getJobConfig()));
break;
case REMOVE_JOB:
stores.getCronJobStore().removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
break;
case REMOVE_LOCK:
case SAVE_LOCK:
// TODO(jly): Deprecated, remove in 0.21. See AURORA-1959.
break;
case SAVE_TASKS:
stores.getUnsafeTaskStore().saveTasks(backfill.backfillTasks(op.getSaveTasks().getTasks()));
break;
case REMOVE_TASKS:
stores.getUnsafeTaskStore().deleteTasks(op.getRemoveTasks().getTaskIds());
break;
case SAVE_QUOTA:
SaveQuota saveQuota = op.getSaveQuota();
stores.getQuotaStore().saveQuota(
saveQuota.getRole(),
ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
break;
case REMOVE_QUOTA:
stores.getQuotaStore().removeQuota(op.getRemoveQuota().getRole());
break;
case SAVE_HOST_ATTRIBUTES:
HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
// Prior to commit 5cf760b, the store would persist maintenance mode changes for
// unknown hosts. 5cf760b began rejecting these, but the storage may still
// contain entries with a null slave ID.
if (attributes.isSetSlaveId()) {
stores.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes));
} else {
LOG.info("Dropping host attributes with no agent ID: " + attributes);
}
break;
case SAVE_JOB_UPDATE:
stores.getJobUpdateStore().saveJobUpdate(
backfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate()));
break;
case SAVE_JOB_UPDATE_EVENT:
SaveJobUpdateEvent jobEvent = op.getSaveJobUpdateEvent();
stores.getJobUpdateStore().saveJobUpdateEvent(
IJobUpdateKey.build(jobEvent.getKey()),
IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
break;
case SAVE_JOB_INSTANCE_UPDATE_EVENT:
SaveJobInstanceUpdateEvent instanceEvent = op.getSaveJobInstanceUpdateEvent();
stores.getJobUpdateStore().saveJobInstanceUpdateEvent(
IJobUpdateKey.build(instanceEvent.getKey()),
IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
break;
case PRUNE_JOB_UPDATE_HISTORY:
LOG.info("Dropping prune operation. Updates will be pruned later.");
break;
case REMOVE_JOB_UPDATE:
stores.getJobUpdateStore().removeJobUpdates(
IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys()));
break;
case SAVE_HOST_MAINTENANCE_REQUEST:
stores.getHostMaintenanceStore().saveHostMaintenanceRequest(
IHostMaintenanceRequest
.build(op.getSaveHostMaintenanceRequest().getHostMaintenanceRequest()));
break;
case REMOVE_HOST_MAINTENANCE_REQUEST:
stores.getHostMaintenanceStore().removeHostMaintenanceRequest(
op.getRemoveHostMaintenanceRequest().getHost());
break;
default:
throw new IllegalArgumentException("Unrecognized op type " + op.getSetField());
}
}
}