blob: 1c811e3a179ab51c34f833e9f4e01d65ea9f3f6a [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage.durability;
import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.RemoveTasks;
import org.apache.aurora.gen.storage.SaveHostAttributes;
import org.apache.aurora.gen.storage.SaveTasks;
/**
* Records a sequence of mutations to the storage.
*/
class TransactionRecorder {
private final List<Op> ops = Lists.newArrayList();
void add(Op op) {
Op prior = Iterables.getLast(ops, null);
if (prior == null || !coalesce(prior, op)) {
ops.add(op);
}
}
List<Op> getOps() {
return ops;
}
/**
* Tries to coalesce a new op into the prior to compact the binary representation and increase
* batching.
*
* @param prior The previous op.
* @param next The next op to be added.
* @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
*/
private boolean coalesce(Op prior, Op next) {
if (!prior.isSet() && !next.isSet()) {
return false;
}
Op._Fields priorType = prior.getSetField();
if (!priorType.equals(next.getSetField())) {
return false;
}
switch (priorType) {
case SAVE_FRAMEWORK_ID:
prior.setSaveFrameworkId(next.getSaveFrameworkId());
return true;
case SAVE_TASKS:
coalesce(prior.getSaveTasks(), next.getSaveTasks());
return true;
case REMOVE_TASKS:
coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
return true;
case SAVE_HOST_ATTRIBUTES:
return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
default:
return false;
}
}
private void coalesce(SaveTasks prior, SaveTasks next) {
if (next.isSetTasks()) {
if (prior.isSetTasks()) {
// It is an expected invariant that an operation may reference a task (identified by
// task ID) no more than one time. Therefore, to coalesce two SaveTasks operations,
// the most recent task definition overrides the prior operation.
Map<String, ScheduledTask> coalesced = Maps.newHashMap();
for (ScheduledTask task : prior.getTasks()) {
coalesced.put(task.getAssignedTask().getTaskId(), task);
}
for (ScheduledTask task : next.getTasks()) {
coalesced.put(task.getAssignedTask().getTaskId(), task);
}
prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
} else {
prior.setTasks(next.getTasks());
}
}
}
private void coalesce(RemoveTasks prior, RemoveTasks next) {
if (next.isSetTaskIds()) {
if (prior.isSetTaskIds()) {
prior.setTaskIds(ImmutableSet.<String>builder()
.addAll(prior.getTaskIds())
.addAll(next.getTaskIds())
.build());
} else {
prior.setTaskIds(next.getTaskIds());
}
}
}
private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
return true;
}
return false;
}
}