| /** |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.aurora.scheduler.storage.log; |
| |
| import java.util.Map; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| |
| import org.apache.aurora.common.stats.Stats; |
| import org.apache.aurora.common.util.testing.FakeBuildInfo; |
| import org.apache.aurora.common.util.testing.FakeClock; |
| import org.apache.aurora.gen.Attribute; |
| import org.apache.aurora.gen.BatchJobUpdateStrategy; |
| import org.apache.aurora.gen.CronCollisionPolicy; |
| import org.apache.aurora.gen.HostAttributes; |
| import org.apache.aurora.gen.HostMaintenanceRequest; |
| import org.apache.aurora.gen.Identity; |
| import org.apache.aurora.gen.InstanceTaskConfig; |
| import org.apache.aurora.gen.JobConfiguration; |
| import org.apache.aurora.gen.JobInstanceUpdateEvent; |
| import org.apache.aurora.gen.JobKey; |
| import org.apache.aurora.gen.JobUpdate; |
| import org.apache.aurora.gen.JobUpdateAction; |
| import org.apache.aurora.gen.JobUpdateDetails; |
| import org.apache.aurora.gen.JobUpdateEvent; |
| import org.apache.aurora.gen.JobUpdateInstructions; |
| import org.apache.aurora.gen.JobUpdateKey; |
| import org.apache.aurora.gen.JobUpdateSettings; |
| import org.apache.aurora.gen.JobUpdateState; |
| import org.apache.aurora.gen.JobUpdateStatus; |
| import org.apache.aurora.gen.JobUpdateStrategy; |
| import org.apache.aurora.gen.JobUpdateSummary; |
| import org.apache.aurora.gen.MaintenanceMode; |
| import org.apache.aurora.gen.PercentageSlaPolicy; |
| import org.apache.aurora.gen.Range; |
| import org.apache.aurora.gen.SlaPolicy; |
| import org.apache.aurora.gen.storage.QuotaConfiguration; |
| import org.apache.aurora.gen.storage.SchedulerMetadata; |
| import org.apache.aurora.gen.storage.Snapshot; |
| import org.apache.aurora.gen.storage.StoredCronJob; |
| import org.apache.aurora.gen.storage.StoredJobUpdateDetails; |
| import org.apache.aurora.scheduler.base.JobKeys; |
| import org.apache.aurora.scheduler.base.TaskTestUtil; |
| import org.apache.aurora.scheduler.resources.ResourceBag; |
| import org.apache.aurora.scheduler.storage.Storage; |
| import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; |
| import org.apache.aurora.scheduler.storage.durability.Loader; |
| import org.apache.aurora.scheduler.storage.durability.Persistence.Edit; |
| import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; |
| import org.apache.aurora.scheduler.storage.entities.IHostAttributes; |
| import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest; |
| import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; |
| import org.apache.aurora.scheduler.storage.entities.IJobKey; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; |
| import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; |
| import org.apache.aurora.scheduler.storage.entities.IScheduledTask; |
| import org.apache.aurora.scheduler.storage.entities.ITaskConfig; |
| import org.apache.aurora.scheduler.storage.mem.MemStorageModule; |
| import org.junit.Test; |
| |
| import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo; |
| import static org.apache.aurora.scheduler.base.TaskTestUtil.THRIFT_BACKFILL; |
| import static org.apache.aurora.scheduler.resources.ResourceManager.aggregateFromBag; |
| import static org.apache.aurora.scheduler.storage.log.SnapshotterImpl.SNAPSHOT_RESTORE; |
| import static org.apache.aurora.scheduler.storage.log.SnapshotterImpl.SNAPSHOT_SAVE; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| |
| public class SnapshotterImplIT { |
| |
| private static final long NOW = 10335463456L; |
| private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "job"); |
| |
| private Storage storage; |
| private SnapshotterImpl snapshotter; |
| |
| private void setUpStore() { |
| storage = MemStorageModule.newEmptyStorage(); |
| FakeClock clock = new FakeClock(); |
| clock.setNowMillis(NOW); |
| snapshotter = new SnapshotterImpl(generateBuildInfo(), clock); |
| Stats.flush(); |
| } |
| |
| @Test |
| public void testBackfill() { |
| setUpStore(); |
| storage.write((NoResult.Quiet) stores -> |
| Loader.load( |
| stores, |
| THRIFT_BACKFILL, |
| snapshotter.asStream(makeNonBackfilled()).map(Edit::op))); |
| |
| assertEquals(expected(), storage.write(snapshotter::from)); |
| assertSnapshotRestoreStats(1L); |
| assertSnapshotSaveStats(1L); |
| } |
| |
| private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", JOB_KEY); |
| private static final ITaskConfig TASK_CONFIG = TaskTestUtil.makeConfig(JOB_KEY); |
| private static final IJobConfiguration CRON_JOB = IJobConfiguration.build(new JobConfiguration() |
| .setKey(new JobKey("owner", "env", "name")) |
| .setOwner(new Identity("user")) |
| .setCronSchedule("* * * * *") |
| .setCronCollisionPolicy(CronCollisionPolicy.KILL_EXISTING) |
| .setInstanceCount(1) |
| .setTaskConfig(TASK_CONFIG.newBuilder())); |
| private static final String ROLE = "role"; |
| private static final IResourceAggregate QUOTA = |
| ThriftBackfill.backfillResourceAggregate(aggregateFromBag(ResourceBag.LARGE).newBuilder()); |
| private static final IHostAttributes ATTRIBUTES = IHostAttributes.build( |
| new HostAttributes("host", ImmutableSet.of(new Attribute("attr", ImmutableSet.of("value")))) |
| .setMode(MaintenanceMode.NONE) |
| .setSlaveId("slave id")); |
| private static final String FRAMEWORK_ID = "framework_id"; |
| private static final Map<String, String> METADATA = ImmutableMap.of( |
| FakeBuildInfo.DATE, FakeBuildInfo.DATE, |
| FakeBuildInfo.GIT_REVISION, FakeBuildInfo.GIT_REVISION, |
| FakeBuildInfo.GIT_TAG, FakeBuildInfo.GIT_TAG); |
| private static final IJobUpdateKey UPDATE_ID = |
| IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "updateId1")); |
| private static final IJobUpdateDetails UPDATE = IJobUpdateDetails.build(new JobUpdateDetails() |
| .setUpdate(new JobUpdate() |
| .setInstructions(new JobUpdateInstructions() |
| .setDesiredState(new InstanceTaskConfig() |
| .setTask(TASK_CONFIG.newBuilder()) |
| .setInstances(ImmutableSet.of(new Range(0, 7)))) |
| .setInitialState(ImmutableSet.of( |
| new InstanceTaskConfig() |
| .setInstances(ImmutableSet.of(new Range(0, 1))) |
| .setTask(TASK_CONFIG.newBuilder()))) |
| .setSettings(new JobUpdateSettings() |
| .setBlockIfNoPulsesAfterMs(500) |
| .setUpdateStrategy( |
| JobUpdateStrategy.batchStrategy(new BatchJobUpdateStrategy().setGroupSize(1))) |
| .setMaxPerInstanceFailures(1) |
| .setMaxFailedInstances(1) |
| .setMinWaitInInstanceRunningMs(200) |
| .setRollbackOnFailure(true) |
| .setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(0, 0))))) |
| .setSummary(new JobUpdateSummary() |
| .setState(new JobUpdateState().setStatus(JobUpdateStatus.ERROR)) |
| .setUser("user") |
| .setKey(UPDATE_ID.newBuilder()))) |
| .setUpdateEvents(ImmutableList.of(new JobUpdateEvent() |
| .setUser("user") |
| .setMessage("message") |
| .setStatus(JobUpdateStatus.ERROR))) |
| .setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent() |
| .setAction(JobUpdateAction.INSTANCE_UPDATED)))); |
| private static final IHostMaintenanceRequest HOST_MAINTENANCE_REQUEST = |
| IHostMaintenanceRequest.build( |
| new HostMaintenanceRequest() |
| .setHost("host") |
| .setDefaultSlaPolicy(SlaPolicy.percentageSlaPolicy( |
| new PercentageSlaPolicy() |
| .setPercentage(95) |
| .setDurationSecs(1800))) |
| .setTimeoutSecs(1800)); |
| |
| private Snapshot expected() { |
| return new Snapshot() |
| .setTimestamp(NOW) |
| .setTasks(ImmutableSet.of(TASK.newBuilder())) |
| .setQuotaConfigurations(ImmutableSet.of(new QuotaConfiguration(ROLE, QUOTA.newBuilder()))) |
| .setHostAttributes(ImmutableSet.of(ATTRIBUTES.newBuilder())) |
| .setCronJobs(ImmutableSet.of(new StoredCronJob(CRON_JOB.newBuilder()))) |
| .setSchedulerMetadata(new SchedulerMetadata(FRAMEWORK_ID, METADATA)) |
| .setJobUpdateDetails(ImmutableSet.of( |
| new StoredJobUpdateDetails().setDetails(UPDATE.newBuilder()))) |
| .setHostMaintenanceRequests(ImmutableSet.of(HOST_MAINTENANCE_REQUEST.newBuilder())); |
| } |
| |
| private Snapshot makeNonBackfilled() { |
| return expected(); |
| } |
| |
| private void assertSnapshotSaveStats(long count) { |
| for (String stat : snapshotter.snapshotFieldNames()) { |
| assertEquals(count, Stats.getVariable(SNAPSHOT_SAVE + stat + "_events").read()); |
| assertNotNull(Stats.getVariable(SNAPSHOT_SAVE + stat + "_nanos_total")); |
| } |
| } |
| |
| private void assertSnapshotRestoreStats(long count) { |
| for (String stat : snapshotter.snapshotFieldNames()) { |
| assertEquals(count, Stats.getVariable(SNAPSHOT_RESTORE + stat + "_events").read()); |
| assertNotNull(Stats.getVariable(SNAPSHOT_RESTORE + stat + "_nanos_total")); |
| } |
| } |
| } |