| /** |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.aurora.scheduler.storage.durability; |
| |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import com.google.common.base.Function; |
| import com.google.common.base.Functions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableSet; |
| |
| import org.apache.aurora.common.testing.easymock.EasyMockTest; |
| import org.apache.aurora.gen.AssignedTask; |
| import org.apache.aurora.gen.Attribute; |
| import org.apache.aurora.gen.HostAttributes; |
| import org.apache.aurora.gen.HostMaintenanceRequest; |
| import org.apache.aurora.gen.InstanceTaskConfig; |
| import org.apache.aurora.gen.JobConfiguration; |
| import org.apache.aurora.gen.JobInstanceUpdateEvent; |
| import org.apache.aurora.gen.JobUpdate; |
| import org.apache.aurora.gen.JobUpdateAction; |
| import org.apache.aurora.gen.JobUpdateEvent; |
| import org.apache.aurora.gen.JobUpdateInstructions; |
| import org.apache.aurora.gen.JobUpdateKey; |
| import org.apache.aurora.gen.JobUpdateSettings; |
| import org.apache.aurora.gen.JobUpdateStatus; |
| import org.apache.aurora.gen.JobUpdateSummary; |
| import org.apache.aurora.gen.MaintenanceMode; |
| import org.apache.aurora.gen.PercentageSlaPolicy; |
| import org.apache.aurora.gen.Range; |
| import org.apache.aurora.gen.ResourceAggregate; |
| import org.apache.aurora.gen.ScheduleStatus; |
| import org.apache.aurora.gen.ScheduledTask; |
| import org.apache.aurora.gen.SlaPolicy; |
| import org.apache.aurora.gen.TaskConfig; |
| import org.apache.aurora.gen.storage.Op; |
| import org.apache.aurora.gen.storage.PruneJobUpdateHistory; |
| import org.apache.aurora.gen.storage.RemoveHostMaintenanceRequest; |
| import org.apache.aurora.gen.storage.RemoveJob; |
| import org.apache.aurora.gen.storage.RemoveJobUpdates; |
| import org.apache.aurora.gen.storage.RemoveLock; |
| import org.apache.aurora.gen.storage.RemoveQuota; |
| import org.apache.aurora.gen.storage.RemoveTasks; |
| import org.apache.aurora.gen.storage.SaveCronJob; |
| import org.apache.aurora.gen.storage.SaveFrameworkId; |
| import org.apache.aurora.gen.storage.SaveHostAttributes; |
| import org.apache.aurora.gen.storage.SaveHostMaintenanceRequest; |
| import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; |
| import org.apache.aurora.gen.storage.SaveJobUpdate; |
| import org.apache.aurora.gen.storage.SaveJobUpdateEvent; |
| import org.apache.aurora.gen.storage.SaveLock; |
| import org.apache.aurora.gen.storage.SaveQuota; |
| import org.apache.aurora.gen.storage.SaveTasks; |
| import org.apache.aurora.scheduler.base.JobKeys; |
| import org.apache.aurora.scheduler.base.TaskTestUtil; |
| import org.apache.aurora.scheduler.base.Tasks; |
| import org.apache.aurora.scheduler.events.EventSink; |
| import org.apache.aurora.scheduler.events.PubsubEvent; |
| import org.apache.aurora.scheduler.resources.ResourceTestUtil; |
| import org.apache.aurora.scheduler.storage.AttributeStore; |
| import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; |
| import org.apache.aurora.scheduler.storage.Storage.MutateWork; |
| import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; |
| import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet; |
| import org.apache.aurora.scheduler.storage.durability.Persistence.Edit; |
| import org.apache.aurora.scheduler.storage.entities.IHostAttributes; |
| import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest; |
| import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; |
| import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; |
| import org.apache.aurora.scheduler.storage.entities.IJobKey; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdate; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; |
| import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; |
| import org.apache.aurora.scheduler.storage.entities.IScheduledTask; |
| import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; |
| import org.easymock.Capture; |
| import org.easymock.EasyMock; |
| import org.easymock.IAnswer; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import static org.apache.aurora.gen.Resource.diskMb; |
| import static org.apache.aurora.gen.Resource.numCpus; |
| import static org.apache.aurora.gen.Resource.ramMb; |
| import static org.apache.aurora.scheduler.base.TaskTestUtil.makeConfig; |
| import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; |
| import static org.easymock.EasyMock.anyObject; |
| import static org.easymock.EasyMock.capture; |
| import static org.easymock.EasyMock.expect; |
| import static org.easymock.EasyMock.expectLastCall; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| public class DurableStorageTest extends EasyMockTest { |
| |
| private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "name"); |
| private static final IJobUpdateKey UPDATE_ID = |
| IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), "testUpdateId")); |
| |
| private DurableStorage durableStorage; |
| private Persistence persistence; |
| private StorageTestUtil storageUtil; |
| private EventSink eventSink; |
| |
| @Before |
| public void setUp() { |
| persistence = createMock(Persistence.class); |
| storageUtil = new StorageTestUtil(this); |
| eventSink = createMock(EventSink.class); |
| |
| durableStorage = new DurableStorage( |
| persistence, |
| storageUtil.storage, |
| storageUtil.schedulerStore, |
| storageUtil.jobStore, |
| storageUtil.taskStore, |
| storageUtil.quotaStore, |
| storageUtil.attributeStore, |
| storageUtil.jobUpdateStore, |
| storageUtil.hostMaintenanceStore, |
| eventSink, |
| new ReentrantLock(), |
| TaskTestUtil.THRIFT_BACKFILL); |
| |
| storageUtil.storage.prepare(); |
| } |
| |
| @Test |
| public void testStart() throws Exception { |
| // We should initialize persistence. |
| persistence.prepare(); |
| |
| // Our start should recover persistence and then forward to the underlying storage start of the |
| // supplied initialization logic. |
| AtomicBoolean initialized = new AtomicBoolean(false); |
| MutateWork.NoResult.Quiet initializationLogic = provider -> { |
| // Creating a mock and expecting apply(storeProvider) does not work here for whatever |
| // reason. |
| initialized.set(true); |
| }; |
| |
| Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture(); |
| storageUtil.storage.write(capture(recoverAndInitializeWork)); |
| expectLastCall().andAnswer(() -> { |
| recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider); |
| return null; |
| }); |
| |
| Capture<MutateWork<Void, RuntimeException>> initializationWork = createCapture(); |
| expect(storageUtil.storage.write(capture(initializationWork))).andAnswer( |
| () -> { |
| initializationWork.getValue().apply(storageUtil.mutableStoreProvider); |
| return null; |
| }); |
| |
| // Populate all Op types. |
| buildReplayOps(); |
| |
| storageUtil.expectStoreAccesses(); |
| |
| control.replay(); |
| |
| durableStorage.prepare(); |
| durableStorage.start(initializationLogic); |
| assertTrue(initialized.get()); |
| } |
| |
| private void buildReplayOps() throws Exception { |
| ImmutableSet.Builder<Edit> builder = ImmutableSet.builder(); |
| |
| builder.add(Edit.op(Op.saveFrameworkId(new SaveFrameworkId("bob")))); |
| storageUtil.schedulerStore.saveFrameworkId("bob"); |
| |
| JobConfiguration actualJob = new JobConfiguration().setTaskConfig(nonBackfilledConfig()); |
| JobConfiguration expectedJob = |
| new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder()); |
| SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob); |
| builder.add(Edit.op(Op.saveCronJob(cronJob))); |
| storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob)); |
| |
| RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder()); |
| builder.add(Edit.op(Op.removeJob(removeJob))); |
| storageUtil.jobStore.removeJob(JOB_KEY); |
| |
| ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder(); |
| actualTask.getAssignedTask().setTask(nonBackfilledConfig()); |
| IScheduledTask expectedTask = makeTask("id", JOB_KEY); |
| SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask)); |
| builder.add(Edit.op(Op.saveTasks(saveTasks))); |
| storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask)); |
| |
| // Side-effects from a storage reset, caused by a snapshot. |
| builder.add(Edit.deleteAll()); |
| storageUtil.jobStore.deleteJobs(); |
| storageUtil.taskStore.deleteAllTasks(); |
| storageUtil.quotaStore.deleteQuotas(); |
| storageUtil.attributeStore.deleteHostAttributes(); |
| storageUtil.jobUpdateStore.deleteAllUpdates(); |
| storageUtil.hostMaintenanceStore.deleteHostMaintenanceRequests(); |
| |
| RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1")); |
| builder.add(Edit.op(Op.removeTasks(removeTasks))); |
| storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds()); |
| |
| ResourceAggregate resourceAggregate = new ResourceAggregate() |
| .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))); |
| SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), resourceAggregate); |
| builder.add(Edit.op(Op.saveQuota(saveQuota))); |
| storageUtil.quotaStore.saveQuota( |
| saveQuota.getRole(), |
| IResourceAggregate.build(resourceAggregate)); |
| |
| builder.add(Edit.op(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole())))); |
| storageUtil.quotaStore.removeQuota(JOB_KEY.getRole()); |
| |
| // This entry lacks a slave ID, and should therefore be discarded. |
| SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes() |
| .setHost("host1") |
| .setMode(MaintenanceMode.DRAINED)); |
| builder.add(Edit.op(Op.saveHostAttributes(hostAttributes1))); |
| |
| SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes() |
| .setHost("host2") |
| .setSlaveId("slave2") |
| .setMode(MaintenanceMode.DRAINED)); |
| builder.add(Edit.op(Op.saveHostAttributes(hostAttributes2))); |
| expect(storageUtil.attributeStore.saveHostAttributes( |
| IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true); |
| |
| builder.add(Edit.op(Op.saveLock(new SaveLock()))); |
| // TODO(jly): Deprecated, this is a no-op to be removed in 0.21. See AURORA-1959. |
| |
| builder.add(Edit.op(Op.removeLock(new RemoveLock()))); |
| // TODO(jly): Deprecated, this is a no-op to be removed in 0.21. See AURORA-1959. |
| |
| JobUpdate actualUpdate = new JobUpdate() |
| .setSummary(new JobUpdateSummary().setKey(UPDATE_ID.newBuilder())) |
| .setInstructions(new JobUpdateInstructions() |
| .setInitialState( |
| ImmutableSet.of(new InstanceTaskConfig().setTask(nonBackfilledConfig()))) |
| .setDesiredState(new InstanceTaskConfig().setTask(nonBackfilledConfig()))); |
| JobUpdate expectedUpdate = actualUpdate.deepCopy(); |
| expectedUpdate.getInstructions().getDesiredState().setTask(makeConfig(JOB_KEY).newBuilder()); |
| expectedUpdate.getInstructions().getInitialState() |
| .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder())); |
| SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate); |
| builder.add(Edit.op(Op.saveJobUpdate(saveUpdate))); |
| storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate)); |
| |
| SaveJobUpdateEvent saveUpdateEvent = |
| new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder()); |
| builder.add(Edit.op(Op.saveJobUpdateEvent(saveUpdateEvent))); |
| storageUtil.jobUpdateStore.saveJobUpdateEvent( |
| UPDATE_ID, |
| IJobUpdateEvent.build(saveUpdateEvent.getEvent())); |
| |
| SaveJobInstanceUpdateEvent saveInstanceEvent = new SaveJobInstanceUpdateEvent( |
| new JobInstanceUpdateEvent(), |
| UPDATE_ID.newBuilder()); |
| builder.add(Edit.op(Op.saveJobInstanceUpdateEvent(saveInstanceEvent))); |
| storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent( |
| UPDATE_ID, |
| IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent())); |
| |
| builder.add(Edit.op(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L)))); |
| // No expectation - this op is ignored. |
| |
| builder.add(Edit.op(Op.removeJobUpdate( |
| new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder()))))); |
| storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID)); |
| |
| expect(persistence.recover()).andReturn(builder.build().stream()); |
| } |
| |
| private TaskConfig nonBackfilledConfig() { |
| // When more fields have to be backfilled |
| // modify this method. |
| return makeConfig(JOB_KEY).newBuilder(); |
| } |
| |
| abstract class AbstractStorageFixture { |
| private final AtomicBoolean runCalled = new AtomicBoolean(false); |
| |
| AbstractStorageFixture() { |
| // Prevent otherwise silent noop tests that forget to call run(). |
| addTearDown(new TearDown() { |
| @Override |
| public void tearDown() { |
| assertTrue(runCalled.get()); |
| } |
| }); |
| } |
| |
| void run() throws Exception { |
| runCalled.set(true); |
| |
| // Expect basic start operations. |
| |
| // Initialize persistence. |
| persistence.prepare(); |
| |
| // Replay the ops and perform and supplied initializationWork. |
| // Simulate NOOP initialization work |
| // Creating a mock and expecting apply(storeProvider) does not work here for whatever |
| // reason. |
| MutateWork.NoResult.Quiet initializationLogic = storeProvider -> { |
| // No-op. |
| }; |
| |
| Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = createCapture(); |
| storageUtil.storage.write(capture(recoverAndInitializeWork)); |
| expectLastCall().andAnswer(() -> { |
| recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider); |
| return null; |
| }); |
| |
| expect(persistence.recover()).andReturn(Stream.empty()); |
| Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture(); |
| expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer( |
| () -> { |
| recoveryWork.getValue().apply(storageUtil.mutableStoreProvider); |
| return null; |
| }); |
| |
| // Setup custom test expectations. |
| setupExpectations(); |
| |
| control.replay(); |
| |
| // Start the system. |
| durableStorage.prepare(); |
| durableStorage.start(initializationLogic); |
| |
| // Exercise the system. |
| runTest(); |
| } |
| |
| protected void setupExpectations() throws Exception { |
| // Default to no expectations. |
| } |
| |
| protected abstract void runTest(); |
| } |
| |
| abstract class AbstractMutationFixture extends AbstractStorageFixture { |
| @Override |
| protected void runTest() { |
| durableStorage.write((Quiet) AbstractMutationFixture.this::performMutations); |
| } |
| |
| protected abstract void performMutations(MutableStoreProvider storeProvider); |
| } |
| |
| private void expectPersist(Op op, Op... ops) { |
| try { |
| // Workaround for comparing streams. |
| persistence.persist(anyObject()); |
| expectLastCall().andAnswer((IAnswer<Void>) () -> { |
| assertEquals( |
| ImmutableList.<Op>builder().add(op).add(ops).build(), |
| ((Stream<Op>) EasyMock.getCurrentArguments()[0]).collect(Collectors.toList())); |
| |
| return null; |
| }); |
| } catch (Persistence.PersistenceException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Test |
| public void testSaveFrameworkId() throws Exception { |
| String frameworkId = "bob"; |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.schedulerStore.saveFrameworkId(frameworkId); |
| expectPersist(Op.saveFrameworkId(new SaveFrameworkId(frameworkId))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getSchedulerStore().saveFrameworkId(frameworkId); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testSaveAcceptedJob() throws Exception { |
| IJobConfiguration jobConfig = |
| IJobConfiguration.build(new JobConfiguration().setKey(JOB_KEY.newBuilder())); |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.jobStore.saveAcceptedJob(jobConfig); |
| expectPersist(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder()))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getCronJobStore().saveAcceptedJob(jobConfig); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testRemoveJob() throws Exception { |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.jobStore.removeJob(JOB_KEY); |
| expectPersist(Op.removeJob(new RemoveJob().setJobKey(JOB_KEY.newBuilder()))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getCronJobStore().removeJob(JOB_KEY); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testSaveTasks() throws Exception { |
| Set<IScheduledTask> tasks = ImmutableSet.of(task("a", ScheduleStatus.INIT)); |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.taskStore.saveTasks(tasks); |
| expectPersist(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks)))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getUnsafeTaskStore().saveTasks(tasks); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testMutateTasks() throws Exception { |
| String taskId = "fred"; |
| Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); |
| Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING)); |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); |
| expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testNestedTransactions() throws Exception { |
| String taskId = "fred"; |
| Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); |
| Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING)); |
| ImmutableSet<String> tasksToRemove = ImmutableSet.of("b"); |
| |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); |
| |
| storageUtil.taskStore.deleteTasks(tasksToRemove); |
| |
| expectPersist( |
| Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))), |
| Op.removeTasks(new RemoveTasks(tasksToRemove))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); |
| |
| durableStorage.write((NoResult.Quiet) |
| innerProvider -> innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove)); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testSaveAndMutateTasks() throws Exception { |
| String taskId = "fred"; |
| Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); |
| Set<IScheduledTask> saved = ImmutableSet.of(task("a", ScheduleStatus.INIT)); |
| Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING)); |
| |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.taskStore.saveTasks(saved); |
| |
| // Nested transaction with result. |
| expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); |
| |
| // Resulting stream operation. |
| expectPersist(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getUnsafeTaskStore().saveTasks(saved); |
| assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception { |
| String taskId = "fred"; |
| Function<IScheduledTask, IScheduledTask> mutation = Functions.identity(); |
| Set<IScheduledTask> saved = ImmutableSet.of(task("b", ScheduleStatus.INIT)); |
| Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING)); |
| |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.taskStore.saveTasks(saved); |
| |
| // Nested transaction with result. |
| expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated); |
| |
| // Resulting stream operation. |
| expectPersist(Op.saveTasks(new SaveTasks( |
| ImmutableSet.<ScheduledTask>builder() |
| .addAll(IScheduledTask.toBuildersList(saved)) |
| .add(mutated.get().newBuilder()) |
| .build()))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getUnsafeTaskStore().saveTasks(saved); |
| assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation)); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testRemoveTasksQuery() throws Exception { |
| IScheduledTask task = task("a", ScheduleStatus.FINISHED); |
| Set<String> taskIds = Tasks.ids(task); |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.taskStore.deleteTasks(taskIds); |
| expectPersist(Op.removeTasks(new RemoveTasks(taskIds))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getUnsafeTaskStore().deleteTasks(taskIds); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testRemoveTasksIds() throws Exception { |
| Set<String> taskIds = ImmutableSet.of("42"); |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.taskStore.deleteTasks(taskIds); |
| expectPersist(Op.removeTasks(new RemoveTasks(taskIds))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getUnsafeTaskStore().deleteTasks(taskIds); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testSaveQuota() throws Exception { |
| String role = "role"; |
| IResourceAggregate quota = ResourceTestUtil.aggregate(1.0, 128L, 1024L); |
| |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.quotaStore.saveQuota(role, quota); |
| expectPersist(Op.saveQuota(new SaveQuota(role, quota.newBuilder()))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getQuotaStore().saveQuota(role, quota); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testRemoveQuota() throws Exception { |
| String role = "role"; |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.quotaStore.removeQuota(role); |
| expectPersist(Op.removeQuota(new RemoveQuota(role))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getQuotaStore().removeQuota(role); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testSaveHostAttributes() throws Exception { |
| String host = "hostname"; |
| Set<Attribute> attributes = |
| ImmutableSet.of(new Attribute().setName("attr").setValues(ImmutableSet.of("value"))); |
| Optional<IHostAttributes> hostAttributes = Optional.of( |
| IHostAttributes.build(new HostAttributes() |
| .setHost(host) |
| .setAttributes(attributes))); |
| |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| expect(storageUtil.attributeStore.getHostAttributes(host)) |
| .andReturn(Optional.empty()); |
| |
| expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes); |
| |
| expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())).andReturn(true); |
| eventSink.post(new PubsubEvent.HostAttributesChanged(hostAttributes.get())); |
| expectPersist( |
| Op.saveHostAttributes(new SaveHostAttributes(hostAttributes.get().newBuilder()))); |
| |
| expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())) |
| .andReturn(false); |
| |
| expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| AttributeStore.Mutable store = storeProvider.getAttributeStore(); |
| assertEquals(Optional.empty(), store.getHostAttributes(host)); |
| |
| assertTrue(store.saveHostAttributes(hostAttributes.get())); |
| |
| assertEquals(hostAttributes, store.getHostAttributes(host)); |
| |
| assertFalse(store.saveHostAttributes(hostAttributes.get())); |
| |
| assertEquals(hostAttributes, store.getHostAttributes(host)); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testSaveUpdate() throws Exception { |
| IJobUpdate update = IJobUpdate.build(new JobUpdate() |
| .setSummary(new JobUpdateSummary() |
| .setKey(UPDATE_ID.newBuilder()) |
| .setUser("user")) |
| .setInstructions(new JobUpdateInstructions() |
| .setDesiredState(new InstanceTaskConfig() |
| .setTask(new TaskConfig()) |
| .setInstances(ImmutableSet.of(new Range(0, 3)))) |
| .setInitialState(ImmutableSet.of(new InstanceTaskConfig() |
| .setTask(new TaskConfig()) |
| .setInstances(ImmutableSet.of(new Range(0, 3))))) |
| .setSettings(new JobUpdateSettings()))); |
| |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.jobUpdateStore.saveJobUpdate(update); |
| expectPersist(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder()))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getJobUpdateStore().saveJobUpdate(update); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testSaveJobUpdateEvent() throws Exception { |
| IJobUpdateEvent event = IJobUpdateEvent.build(new JobUpdateEvent() |
| .setStatus(JobUpdateStatus.ROLLING_BACK) |
| .setTimestampMs(12345L)); |
| |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.jobUpdateStore.saveJobUpdateEvent(UPDATE_ID, event); |
| expectPersist(Op.saveJobUpdateEvent(new SaveJobUpdateEvent( |
| event.newBuilder(), |
| UPDATE_ID.newBuilder()))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getJobUpdateStore().saveJobUpdateEvent(UPDATE_ID, event); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testSaveJobInstanceUpdateEvent() throws Exception { |
| IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build(new JobInstanceUpdateEvent() |
| .setAction(JobUpdateAction.INSTANCE_ROLLING_BACK) |
| .setTimestampMs(12345L) |
| .setInstanceId(0)); |
| |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(UPDATE_ID, event); |
| expectPersist(Op.saveJobInstanceUpdateEvent( |
| new SaveJobInstanceUpdateEvent( |
| event.newBuilder(), |
| UPDATE_ID.newBuilder()))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(UPDATE_ID, event); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testRemoveJobUpdates() throws Exception { |
| IJobUpdateKey key = IJobUpdateKey.build(new JobUpdateKey() |
| .setJob(JOB_KEY.newBuilder()) |
| .setId("update-id")); |
| |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() throws Exception { |
| storageUtil.expectWrite(); |
| storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(key)); |
| |
| // No transaction is generated since this version is currently in 'read-only' |
| // compatibility mode for this operation type. |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key)); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testSaveHostMaintenanceRequest() throws Exception { |
| String host = "hostname"; |
| IHostMaintenanceRequest hostMaintenanceRequest = IHostMaintenanceRequest.build( |
| new HostMaintenanceRequest() |
| .setHost(host) |
| .setDefaultSlaPolicy(SlaPolicy.percentageSlaPolicy( |
| new PercentageSlaPolicy() |
| .setPercentage(95) |
| .setDurationSecs(1800))) |
| .setTimeoutSecs(1800) |
| ); |
| |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() { |
| storageUtil.expectWrite(); |
| storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest(hostMaintenanceRequest); |
| expectPersist(Op.saveHostMaintenanceRequest( |
| new SaveHostMaintenanceRequest(hostMaintenanceRequest.newBuilder()))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getHostMaintenanceStore().saveHostMaintenanceRequest( |
| hostMaintenanceRequest); |
| } |
| }.run(); |
| } |
| |
| @Test |
| public void testRemoveHostMaintenanceRequest() throws Exception { |
| String host = "hostname"; |
| |
| new AbstractMutationFixture() { |
| @Override |
| protected void setupExpectations() { |
| storageUtil.expectWrite(); |
| storageUtil.hostMaintenanceStore.removeHostMaintenanceRequest(host); |
| expectPersist(Op.removeHostMaintenanceRequest(new RemoveHostMaintenanceRequest(host))); |
| } |
| |
| @Override |
| protected void performMutations(MutableStoreProvider storeProvider) { |
| storeProvider.getHostMaintenanceStore().removeHostMaintenanceRequest(host); |
| } |
| }.run(); |
| } |
| |
| private static IScheduledTask task(String id, ScheduleStatus status) { |
| return IScheduledTask.build(new ScheduledTask() |
| .setStatus(status) |
| .setAssignedTask(new AssignedTask() |
| .setTaskId(id) |
| .setTask(new TaskConfig()))); |
| } |
| } |