blob: a8f43cec92b572ee41173817f155585694e83832 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage.log;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.common.stats.Stats;
import org.apache.aurora.common.util.testing.FakeBuildInfo;
import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.BatchJobUpdateStrategy;
import org.apache.aurora.gen.CronCollisionPolicy;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.HostMaintenanceRequest;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.InstanceTaskConfig;
import org.apache.aurora.gen.JobConfiguration;
import org.apache.aurora.gen.JobInstanceUpdateEvent;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.JobUpdate;
import org.apache.aurora.gen.JobUpdateAction;
import org.apache.aurora.gen.JobUpdateDetails;
import org.apache.aurora.gen.JobUpdateEvent;
import org.apache.aurora.gen.JobUpdateInstructions;
import org.apache.aurora.gen.JobUpdateKey;
import org.apache.aurora.gen.JobUpdateSettings;
import org.apache.aurora.gen.JobUpdateState;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.gen.JobUpdateStrategy;
import org.apache.aurora.gen.JobUpdateSummary;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.PercentageSlaPolicy;
import org.apache.aurora.gen.Range;
import org.apache.aurora.gen.SlaPolicy;
import org.apache.aurora.gen.storage.QuotaConfiguration;
import org.apache.aurora.gen.storage.SchedulerMetadata;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.StoredCronJob;
import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.durability.Loader;
import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.junit.Test;
import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo;
import static org.apache.aurora.scheduler.base.TaskTestUtil.THRIFT_BACKFILL;
import static org.apache.aurora.scheduler.resources.ResourceManager.aggregateFromBag;
import static org.apache.aurora.scheduler.storage.log.SnapshotterImpl.SNAPSHOT_RESTORE;
import static org.apache.aurora.scheduler.storage.log.SnapshotterImpl.SNAPSHOT_SAVE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class SnapshotterImplIT {
private static final long NOW = 10335463456L;
private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "job");
private Storage storage;
private SnapshotterImpl snapshotter;
private void setUpStore() {
storage = MemStorageModule.newEmptyStorage();
FakeClock clock = new FakeClock();
clock.setNowMillis(NOW);
snapshotter = new SnapshotterImpl(generateBuildInfo(), clock);
Stats.flush();
}
@Test
public void testBackfill() {
setUpStore();
storage.write((NoResult.Quiet) stores ->
Loader.load(
stores,
THRIFT_BACKFILL,
snapshotter.asStream(makeNonBackfilled()).map(Edit::op)));
assertEquals(expected(), storage.write(snapshotter::from));
assertSnapshotRestoreStats(1L);
assertSnapshotSaveStats(1L);
}
private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", JOB_KEY);
private static final ITaskConfig TASK_CONFIG = TaskTestUtil.makeConfig(JOB_KEY);
private static final IJobConfiguration CRON_JOB = IJobConfiguration.build(new JobConfiguration()
.setKey(new JobKey("owner", "env", "name"))
.setOwner(new Identity("user"))
.setCronSchedule("* * * * *")
.setCronCollisionPolicy(CronCollisionPolicy.KILL_EXISTING)
.setInstanceCount(1)
.setTaskConfig(TASK_CONFIG.newBuilder()));
private static final String ROLE = "role";
private static final IResourceAggregate QUOTA =
ThriftBackfill.backfillResourceAggregate(aggregateFromBag(ResourceBag.LARGE).newBuilder());
private static final IHostAttributes ATTRIBUTES = IHostAttributes.build(
new HostAttributes("host", ImmutableSet.of(new Attribute("attr", ImmutableSet.of("value"))))
.setMode(MaintenanceMode.NONE)
.setSlaveId("slave id"));
private static final String FRAMEWORK_ID = "framework_id";
private static final Map<String, String> METADATA = ImmutableMap.of(
FakeBuildInfo.DATE, FakeBuildInfo.DATE,
FakeBuildInfo.GIT_REVISION, FakeBuildInfo.GIT_REVISION,
FakeBuildInfo.GIT_TAG, FakeBuildInfo.GIT_TAG);
private static final IJobUpdateKey UPDATE_ID =
IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "updateId1"));
private static final IJobUpdateDetails UPDATE = IJobUpdateDetails.build(new JobUpdateDetails()
.setUpdate(new JobUpdate()
.setInstructions(new JobUpdateInstructions()
.setDesiredState(new InstanceTaskConfig()
.setTask(TASK_CONFIG.newBuilder())
.setInstances(ImmutableSet.of(new Range(0, 7))))
.setInitialState(ImmutableSet.of(
new InstanceTaskConfig()
.setInstances(ImmutableSet.of(new Range(0, 1)))
.setTask(TASK_CONFIG.newBuilder())))
.setSettings(new JobUpdateSettings()
.setBlockIfNoPulsesAfterMs(500)
.setUpdateStrategy(
JobUpdateStrategy.batchStrategy(new BatchJobUpdateStrategy().setGroupSize(1)))
.setMaxPerInstanceFailures(1)
.setMaxFailedInstances(1)
.setMinWaitInInstanceRunningMs(200)
.setRollbackOnFailure(true)
.setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(0, 0)))))
.setSummary(new JobUpdateSummary()
.setState(new JobUpdateState().setStatus(JobUpdateStatus.ERROR))
.setUser("user")
.setKey(UPDATE_ID.newBuilder())))
.setUpdateEvents(ImmutableList.of(new JobUpdateEvent()
.setUser("user")
.setMessage("message")
.setStatus(JobUpdateStatus.ERROR)))
.setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent()
.setAction(JobUpdateAction.INSTANCE_UPDATED))));
private static final IHostMaintenanceRequest HOST_MAINTENANCE_REQUEST =
IHostMaintenanceRequest.build(
new HostMaintenanceRequest()
.setHost("host")
.setDefaultSlaPolicy(SlaPolicy.percentageSlaPolicy(
new PercentageSlaPolicy()
.setPercentage(95)
.setDurationSecs(1800)))
.setTimeoutSecs(1800));
private Snapshot expected() {
return new Snapshot()
.setTimestamp(NOW)
.setTasks(ImmutableSet.of(TASK.newBuilder()))
.setQuotaConfigurations(ImmutableSet.of(new QuotaConfiguration(ROLE, QUOTA.newBuilder())))
.setHostAttributes(ImmutableSet.of(ATTRIBUTES.newBuilder()))
.setCronJobs(ImmutableSet.of(new StoredCronJob(CRON_JOB.newBuilder())))
.setSchedulerMetadata(new SchedulerMetadata(FRAMEWORK_ID, METADATA))
.setJobUpdateDetails(ImmutableSet.of(
new StoredJobUpdateDetails().setDetails(UPDATE.newBuilder())))
.setHostMaintenanceRequests(ImmutableSet.of(HOST_MAINTENANCE_REQUEST.newBuilder()));
}
private Snapshot makeNonBackfilled() {
return expected();
}
private void assertSnapshotSaveStats(long count) {
for (String stat : snapshotter.snapshotFieldNames()) {
assertEquals(count, Stats.getVariable(SNAPSHOT_SAVE + stat + "_events").read());
assertNotNull(Stats.getVariable(SNAPSHOT_SAVE + stat + "_nanos_total"));
}
}
private void assertSnapshotRestoreStats(long count) {
for (String stat : snapshotter.snapshotFieldNames()) {
assertEquals(count, Stats.getVariable(SNAPSHOT_RESTORE + stat + "_events").read());
assertNotNull(Stats.getVariable(SNAPSHOT_RESTORE + stat + "_nanos_total"));
}
}
}