| /** |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.aurora.scheduler.storage; |
| |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.inject.Guice; |
| import com.google.inject.Injector; |
| import com.google.inject.Module; |
| |
| import org.apache.aurora.gen.BatchJobUpdateStrategy; |
| import org.apache.aurora.gen.InstanceTaskConfig; |
| import org.apache.aurora.gen.JobInstanceUpdateEvent; |
| import org.apache.aurora.gen.JobUpdate; |
| import org.apache.aurora.gen.JobUpdateAction; |
| import org.apache.aurora.gen.JobUpdateDetails; |
| import org.apache.aurora.gen.JobUpdateEvent; |
| import org.apache.aurora.gen.JobUpdateInstructions; |
| import org.apache.aurora.gen.JobUpdateKey; |
| import org.apache.aurora.gen.JobUpdateQuery; |
| import org.apache.aurora.gen.JobUpdateSettings; |
| import org.apache.aurora.gen.JobUpdateState; |
| import org.apache.aurora.gen.JobUpdateStatus; |
| import org.apache.aurora.gen.JobUpdateStrategy; |
| import org.apache.aurora.gen.JobUpdateSummary; |
| import org.apache.aurora.gen.Metadata; |
| import org.apache.aurora.gen.Range; |
| import org.apache.aurora.gen.TaskConfig; |
| import org.apache.aurora.scheduler.base.JobKeys; |
| import org.apache.aurora.scheduler.base.TaskTestUtil; |
| import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; |
| import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; |
| import org.apache.aurora.scheduler.storage.entities.IJobKey; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; |
| import org.apache.aurora.scheduler.storage.testing.StorageEntityUtil; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import static org.apache.aurora.gen.JobUpdateAction.INSTANCE_ROLLED_BACK; |
| import static org.apache.aurora.gen.JobUpdateAction.INSTANCE_ROLLING_BACK; |
| import static org.apache.aurora.gen.JobUpdateAction.INSTANCE_UPDATED; |
| import static org.apache.aurora.gen.JobUpdateAction.INSTANCE_UPDATING; |
| import static org.apache.aurora.gen.JobUpdateStatus.ABORTED; |
| import static org.apache.aurora.gen.JobUpdateStatus.ERROR; |
| import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK; |
| import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK; |
| import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD; |
| import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_PAUSED; |
| import static org.apache.aurora.gen.Resource.diskMb; |
| import static org.apache.aurora.gen.Resource.numCpus; |
| import static org.apache.aurora.gen.Resource.ramMb; |
| import static org.junit.Assert.assertEquals; |
| |
| public abstract class AbstractJobUpdateStoreTest { |
| |
| private static final IJobKey JOB = JobKeys.from("testRole", "testEnv", "job"); |
| private static final long CREATED_MS = 111L; |
| private static final IJobUpdateEvent FIRST_EVENT = |
| makeJobUpdateEvent(ROLLING_FORWARD, CREATED_MS); |
| private static final ImmutableSet<Metadata> METADATA = |
| ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k2", "v2"), new Metadata("k3", "v3")); |
| |
| protected Injector injector; |
| protected Storage storage; |
| |
| protected abstract Module getStorageModule(); |
| |
| @Before |
| public void setUp() { |
| injector = Guice.createInjector(getStorageModule()); |
| storage = injector.getInstance(Storage.class); |
| storage.prepare(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| truncateUpdates(); |
| } |
| |
| private static IJobUpdateDetails makeFullyPopulatedUpdate(IJobUpdateKey key) { |
| JobUpdateDetails builder = makeJobUpdate(key).newBuilder() |
| .setUpdateEvents(ImmutableList.of(new JobUpdateEvent() |
| .setStatus(ROLLING_FORWARD) |
| .setTimestampMs(1) |
| .setUser("user") |
| .setMessage("message"))) |
| .setInstanceEvents(ImmutableList.of(new JobInstanceUpdateEvent() |
| .setTimestampMs(2) |
| .setInstanceId(1) |
| .setAction(INSTANCE_UPDATING) |
| .setMessage("message2"))); |
| JobUpdateInstructions instructions = builder.getUpdate().getInstructions(); |
| Stream.of( |
| instructions.getInitialState().stream() |
| .map(InstanceTaskConfig::getInstances) |
| .flatMap(Set::stream) |
| .collect(Collectors.toSet()), |
| instructions.getDesiredState().getInstances(), |
| instructions.getSettings().getUpdateOnlyTheseInstances()) |
| .flatMap(Set::stream) |
| .forEach(range -> { |
| if (range.getFirst() == 0) { |
| range.setFirst(1); |
| } |
| if (range.getLast() == 0) { |
| range.setLast(1); |
| } |
| }); |
| return IJobUpdateDetails.build(builder); |
| } |
| |
| @Test |
| public void testSaveJobUpdates() { |
| IJobUpdateKey updateId1 = makeKey(JobKeys.from("role", "env", "name1"), "u1"); |
| IJobUpdateKey updateId2 = makeKey(JobKeys.from("role", "env", "name2"), "u2"); |
| |
| IJobUpdateDetails update1 = makeFullyPopulatedUpdate(updateId1); |
| IJobUpdateDetails update2 = makeJobUpdate(updateId2); |
| |
| assertEquals(Optional.empty(), getUpdate(updateId1)); |
| assertEquals(Optional.empty(), getUpdate(updateId2)); |
| |
| StorageEntityUtil.assertFullyPopulated( |
| update1, |
| StorageEntityUtil.getField(JobUpdateSummary.class, "state"), |
| StorageEntityUtil.getField(IJobUpdateSummary.class, "state"), |
| StorageEntityUtil.getField(Range.class, "first"), |
| StorageEntityUtil.getField(Range.class, "last")); |
| saveUpdate(update1); |
| assertEquals(withUpdateState(update1, ROLLING_FORWARD, 1, 2), getUpdate(key(update1)).get()); |
| |
| saveUpdate(update2); |
| assertUpdate(update2); |
| } |
| |
| @Test |
| public void testSaveJobUpdateWithLargeTaskConfigValues() { |
| // AURORA-1494 regression test validating max resources values are allowed. |
| IJobUpdateKey updateId = makeKey(JobKeys.from("role", "env", "name1"), "u1"); |
| |
| JobUpdateDetails builder = makeFullyPopulatedUpdate(updateId).newBuilder(); |
| builder.getUpdate().getInstructions().getDesiredState().getTask().setResources( |
| ImmutableSet.of( |
| numCpus(Double.MAX_VALUE), |
| ramMb(Long.MAX_VALUE), |
| diskMb(Long.MAX_VALUE))); |
| |
| IJobUpdateDetails update = makeFullyPopulatedUpdate(updateId); |
| StorageEntityUtil.assertFullyPopulated( |
| update, |
| StorageEntityUtil.getField(JobUpdateSummary.class, "state"), |
| StorageEntityUtil.getField(IJobUpdateSummary.class, "state"), |
| StorageEntityUtil.getField(Range.class, "first"), |
| StorageEntityUtil.getField(Range.class, "last")); |
| saveUpdate(update); |
| assertEquals(withUpdateState(update, ROLLING_FORWARD, 1, 2), getUpdate(key(update)).get()); |
| } |
| |
| @Test |
| public void testSaveNullInitialState() { |
| JobUpdateDetails builder = makeJobUpdate(makeKey("u1")).newBuilder(); |
| builder.getUpdate().getInstructions().unsetInitialState(); |
| |
| // Save with null initial state instances. |
| saveUpdate(IJobUpdateDetails.build(builder)); |
| |
| builder.getUpdate().getInstructions().setInitialState(ImmutableSet.of()); |
| assertUpdate(IJobUpdateDetails.build(builder)); |
| } |
| |
| @Test |
| public void testSaveNullDesiredState() { |
| JobUpdateDetails builder = makeJobUpdate(makeKey("u1")).newBuilder(); |
| builder.getUpdate().getInstructions().unsetDesiredState(); |
| |
| // Save with null desired state instances. |
| saveUpdate(IJobUpdateDetails.build(builder)); |
| |
| assertUpdate(IJobUpdateDetails.build(builder)); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testSaveBothInitialAndDesiredMissingThrows() { |
| JobUpdateDetails builder = makeJobUpdate(makeKey("u1")).newBuilder(); |
| builder.getUpdate().getInstructions().unsetInitialState(); |
| builder.getUpdate().getInstructions().unsetDesiredState(); |
| |
| saveUpdate(IJobUpdateDetails.build(builder)); |
| } |
| |
| @Test(expected = NullPointerException.class) |
| public void testSaveNullInitialStateTaskThrows() { |
| JobUpdateDetails builder = makeJobUpdate(makeKey("u1")).newBuilder(); |
| builder.getUpdate().getInstructions().getInitialState().add( |
| new InstanceTaskConfig(null, ImmutableSet.of())); |
| |
| saveUpdate(IJobUpdateDetails.build(builder)); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testSaveEmptyInitialStateRangesThrows() { |
| JobUpdateDetails builder = makeJobUpdate(makeKey("u1")).newBuilder(); |
| builder.getUpdate().getInstructions().getInitialState().add( |
| new InstanceTaskConfig( |
| TaskTestUtil.makeConfig(TaskTestUtil.JOB).newBuilder(), |
| ImmutableSet.of())); |
| |
| saveUpdate(IJobUpdateDetails.build(builder)); |
| } |
| |
| @Test(expected = NullPointerException.class) |
| public void testSaveNullDesiredStateTaskThrows() { |
| JobUpdateDetails builder = makeJobUpdate(makeKey("u1")).newBuilder(); |
| builder.getUpdate().getInstructions().getDesiredState().setTask(null); |
| |
| saveUpdate(IJobUpdateDetails.build(builder)); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testSaveEmptyDesiredStateRangesThrows() { |
| JobUpdateDetails builder = makeJobUpdate(makeKey("u1")).newBuilder(); |
| builder.getUpdate().getInstructions().getDesiredState().setInstances(ImmutableSet.of()); |
| |
| saveUpdate(IJobUpdateDetails.build(builder)); |
| } |
| |
| @Test |
| public void testSaveJobUpdateEmptyInstanceOverrides() { |
| IJobUpdateKey updateId = makeKey("u1"); |
| |
| JobUpdateDetails builder = makeJobUpdate(updateId).newBuilder(); |
| builder.getUpdate().getInstructions().getSettings() |
| .setUpdateOnlyTheseInstances(ImmutableSet.of()); |
| |
| IJobUpdateDetails expected = IJobUpdateDetails.build(builder); |
| |
| // Save with empty overrides. |
| saveUpdate(expected); |
| assertUpdate(expected); |
| } |
| |
| @Test |
| public void testSaveJobUpdateNullInstanceOverrides() { |
| IJobUpdateKey updateId = makeKey("u1"); |
| |
| JobUpdateDetails builder = makeJobUpdate(updateId).newBuilder(); |
| builder.getUpdate().getInstructions().getSettings() |
| .setUpdateOnlyTheseInstances(ImmutableSet.of()); |
| |
| IJobUpdateDetails expected = IJobUpdateDetails.build(builder); |
| |
| // Save with null overrides. |
| builder.getUpdate().getInstructions().getSettings().setUpdateOnlyTheseInstances(null); |
| saveUpdate(IJobUpdateDetails.build(builder)); |
| assertUpdate(expected); |
| } |
| |
| @Test |
| public void testSaveJobUpdateStateIgnored() { |
| IJobUpdateKey updateId = makeKey("u1"); |
| IJobUpdateDetails update = withUpdateState(makeJobUpdate(updateId), ABORTED, 567L, 567L); |
| saveUpdate(update); |
| |
| // Assert state fields were ignored. |
| assertUpdate(withDefaultUpdateState(update)); |
| } |
| |
| @Test |
| public void testMultipleJobDetails() { |
| IJobUpdateKey updateId1 = makeKey(JobKeys.from("role", "env", "name1"), "u1"); |
| IJobUpdateKey updateId2 = makeKey(JobKeys.from("role", "env", "name2"), "u2"); |
| IJobUpdateDetails update1 = makeJobUpdate(updateId1); |
| IJobUpdateDetails update2 = makeJobUpdate(updateId2); |
| |
| saveUpdate(update1); |
| saveUpdate(update2); |
| |
| update1 = updateJobDetails(withDefaultUpdateState(update1), FIRST_EVENT); |
| update2 = updateJobDetails(withDefaultUpdateState(update2), FIRST_EVENT); |
| assertEquals(Optional.of(update1), getUpdateDetails(updateId1)); |
| assertEquals(Optional.of(update2), getUpdateDetails(updateId2)); |
| |
| IJobUpdateEvent jEvent11 = makeJobUpdateEvent(ROLLING_FORWARD, 456L); |
| IJobUpdateEvent jEvent12 = makeJobUpdateEvent(ERROR, 457L); |
| IJobInstanceUpdateEvent iEvent11 = makeJobInstanceEvent(1, 451L, INSTANCE_UPDATED); |
| IJobInstanceUpdateEvent iEvent12 = makeJobInstanceEvent(2, 452L, INSTANCE_UPDATING); |
| |
| IJobUpdateEvent jEvent21 = makeJobUpdateEvent(ROLL_FORWARD_PAUSED, 567L); |
| IJobUpdateEvent jEvent22 = makeJobUpdateEvent(ABORTED, 568L); |
| IJobInstanceUpdateEvent iEvent21 = makeJobInstanceEvent(3, 561L, INSTANCE_UPDATING); |
| IJobInstanceUpdateEvent iEvent22 = makeJobInstanceEvent(3, 562L, INSTANCE_UPDATED); |
| |
| saveJobEvent(jEvent11, updateId1); |
| saveJobEvent(jEvent12, updateId1); |
| saveJobInstanceEvent(iEvent11, updateId1); |
| saveJobInstanceEvent(iEvent12, updateId1); |
| |
| saveJobEvent(jEvent21, updateId2); |
| saveJobEvent(jEvent22, updateId2); |
| assertEquals(ImmutableList.of(), getInstanceEvents(updateId2, 3)); |
| saveJobInstanceEvent(iEvent21, updateId2); |
| |
| assertEquals(ImmutableList.of(iEvent21), getInstanceEvents(updateId2, 3)); |
| saveJobInstanceEvent(iEvent22, updateId2); |
| assertEquals(ImmutableList.of(iEvent21, iEvent22), getInstanceEvents(updateId2, 3)); |
| |
| update1 = updateJobDetails( |
| withUpdateState(update1, ERROR, CREATED_MS, 457L), |
| ImmutableList.of(FIRST_EVENT, jEvent11, jEvent12), ImmutableList.of(iEvent11, iEvent12)); |
| |
| update2 = updateJobDetails( |
| withUpdateState(update2, ABORTED, CREATED_MS, 568L), |
| ImmutableList.of(FIRST_EVENT, jEvent21, jEvent22), ImmutableList.of(iEvent21, iEvent22)); |
| |
| assertEquals( |
| ImmutableList.of(update2, update1), |
| fetchUpdates(JobUpdateStore.MATCH_ALL.newBuilder())); |
| |
| assertEquals( |
| ImmutableList.of(getUpdateDetails(updateId2).get(), getUpdateDetails(updateId1).get()), |
| fetchUpdates(new JobUpdateQuery().setRole("role"))); |
| } |
| |
| @Test |
| public void testTruncateJobUpdates() { |
| saveUpdate(makeJobUpdate(makeKey("u1"))); |
| saveUpdate(makeJobUpdate(makeKey("u2"))); |
| assertEquals(2, fetchUpdates(new JobUpdateQuery()).size()); |
| truncateUpdates(); |
| assertEquals(0, fetchUpdates(new JobUpdateQuery()).size()); |
| truncateUpdates(); |
| assertEquals(0, fetchUpdates(new JobUpdateQuery()).size()); |
| } |
| |
| @Test |
| public void testRemoveUpdates() { |
| IJobUpdateDetails u1 = makeJobUpdate(makeKey("u1")); |
| IJobUpdateDetails u2 = makeJobUpdate(makeKey("u2")); |
| IJobUpdateDetails u3 = makeJobUpdate(makeKey("u3")); |
| saveUpdate(u1); |
| saveUpdate(u2); |
| saveUpdate(u3); |
| |
| storage.write((NoResult.Quiet) store -> |
| store.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key(u1), key(u3)))); |
| assertQueryMatches(new JobUpdateQuery(), withDefaultUpdateState(u2)); |
| |
| // Noop - update already deleted |
| storage.write((NoResult.Quiet) store -> |
| store.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key(u1)))); |
| assertQueryMatches(new JobUpdateQuery(), withDefaultUpdateState(u2)); |
| |
| storage.write((NoResult.Quiet) store -> |
| store.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key(u2)))); |
| assertQueryMatches(new JobUpdateQuery()); |
| } |
| |
| @Test |
| public void testSaveJobUpdateWithDuplicateMetadataKeys() { |
| IJobUpdateKey updateId = makeKey(JobKeys.from("role", "env", "name1"), "u1"); |
| |
| ImmutableSet<Metadata> duplicatedMetadata = |
| ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k1", "v2")); |
| JobUpdateDetails builder = makeJobUpdate(updateId).newBuilder(); |
| builder.getUpdate().getSummary().setMetadata(duplicatedMetadata); |
| |
| assertEquals(Optional.empty(), getUpdate(updateId)); |
| |
| IJobUpdateDetails update = IJobUpdateDetails.build(builder); |
| saveUpdate(update); |
| assertUpdate(update); |
| } |
| |
| @Test |
| public void testQueryDetails() { |
| IJobKey jobKey1 = JobKeys.from("role1", "env", "name1"); |
| IJobUpdateKey updateId1 = makeKey(jobKey1, "u1"); |
| IJobKey jobKey2 = JobKeys.from("role2", "env", "name2"); |
| IJobUpdateKey updateId2 = makeKey(jobKey2, "u2"); |
| |
| IJobUpdateDetails update1 = makeJobUpdate(updateId1); |
| IJobUpdateDetails update2 = makeJobUpdate(updateId2); |
| |
| saveUpdate(update1); |
| saveUpdate(update2); |
| |
| updateJobDetails(withDefaultUpdateState(update1), FIRST_EVENT); |
| updateJobDetails(withDefaultUpdateState(update2), FIRST_EVENT); |
| |
| IJobUpdateEvent jEvent11 = makeJobUpdateEvent(ROLLING_BACK, 450L); |
| IJobUpdateEvent jEvent12 = makeJobUpdateEvent(ROLLED_BACK, 500L); |
| IJobInstanceUpdateEvent iEvent11 = makeJobInstanceEvent(1, 451L, INSTANCE_ROLLING_BACK); |
| IJobInstanceUpdateEvent iEvent12 = makeJobInstanceEvent(2, 458L, INSTANCE_ROLLED_BACK); |
| |
| IJobUpdateEvent jEvent21 = makeJobUpdateEvent(ROLL_FORWARD_PAUSED, 550L); |
| IJobUpdateEvent jEvent22 = makeJobUpdateEvent(ABORTED, 600L); |
| IJobInstanceUpdateEvent iEvent21 = makeJobInstanceEvent(3, 561L, INSTANCE_UPDATING); |
| IJobInstanceUpdateEvent iEvent22 = makeJobInstanceEvent(3, 570L, INSTANCE_UPDATED); |
| |
| saveJobEvent(jEvent11, updateId1); |
| saveJobEvent(jEvent12, updateId1); |
| saveJobInstanceEvent(iEvent11, updateId1); |
| saveJobInstanceEvent(iEvent12, updateId1); |
| |
| saveJobEvent(jEvent21, updateId2); |
| saveJobEvent(jEvent22, updateId2); |
| |
| saveJobInstanceEvent(iEvent21, updateId2); |
| saveJobInstanceEvent(iEvent22, updateId2); |
| |
| // Fetch the updates for checking query results. This avoids the need to manually populate |
| // any fields filled by the storage, which is out of scope for this test case. |
| update1 = getUpdateDetails(updateId1).get(); |
| update2 = getUpdateDetails(updateId2).get(); |
| |
| // Empty query returns all. |
| assertQueryMatches(new JobUpdateQuery(), update2, update1); |
| |
| // Query by update ID. |
| assertQueryMatches(new JobUpdateQuery().setKey(updateId1.newBuilder()), update1); |
| |
| // Query by role. |
| assertQueryMatches(new JobUpdateQuery().setRole(jobKey2.getRole()), update2); |
| |
| // Query by job key. |
| assertQueryMatches(new JobUpdateQuery().setJobKey(jobKey2.newBuilder()), update2); |
| |
| // Query by status. |
| assertQueryMatches(new JobUpdateQuery().setUpdateStatuses(ImmutableSet.of(ABORTED)), update2); |
| |
| // No match. |
| assertQueryMatches(new JobUpdateQuery().setRole("no match")); |
| |
| // Querying by incorrect update keys. |
| assertQueryMatches(new JobUpdateQuery().setJobKey(JobKeys.from("a", "b", "c").newBuilder())); |
| |
| // Query by multiple statuses. |
| assertQueryMatches( |
| new JobUpdateQuery().setUpdateStatuses(ImmutableSet.of(ABORTED, ROLLING_FORWARD)), |
| update2); |
| |
| // Query by empty statuses. |
| assertQueryMatches(new JobUpdateQuery().setUpdateStatuses(ImmutableSet.of()), update2, update1); |
| |
| // Query by user. |
| assertQueryMatches(new JobUpdateQuery().setUser("user"), update2, update1); |
| |
| // Test paging. |
| assertQueryMatches(new JobUpdateQuery().setLimit(1).setOffset(0), update2); |
| assertQueryMatches(new JobUpdateQuery().setLimit(2).setOffset(0), update2, update1); |
| assertQueryMatches(new JobUpdateQuery().setLimit(3).setOffset(0), update2, update1); |
| assertQueryMatches(new JobUpdateQuery().setLimit(1).setOffset(1), update1); |
| assertQueryMatches(new JobUpdateQuery().setLimit(2).setOffset(4)); |
| } |
| |
| private static IJobUpdateKey key(IJobUpdateDetails update) { |
| return update.getUpdate().getSummary().getKey(); |
| } |
| |
| private void assertQueryMatches(JobUpdateQuery query, IJobUpdateDetails... matches) { |
| assertEquals( |
| ImmutableList.copyOf(matches), |
| storage.read(store -> |
| store.getJobUpdateStore().fetchJobUpdates(IJobUpdateQuery.build(query)))); |
| } |
| |
| private static IJobUpdateKey makeKey(String id) { |
| return makeKey(JOB, id); |
| } |
| |
| protected static IJobUpdateKey makeKey(IJobKey job, String id) { |
| return IJobUpdateKey.build(new JobUpdateKey(job.newBuilder(), id)); |
| } |
| |
| private void assertUpdate(IJobUpdateDetails expected) { |
| assertEquals(withDefaultUpdateState(expected), getUpdate(key(expected)).get()); |
| } |
| |
| private Optional<IJobUpdateDetails> getUpdate(IJobUpdateKey key) { |
| return storage.read(storeProvider -> storeProvider.getJobUpdateStore().fetchJobUpdate(key)); |
| } |
| |
| private List<IJobInstanceUpdateEvent> getInstanceEvents(IJobUpdateKey key, int id) { |
| IJobUpdateDetails update = |
| storage.read(store -> store.getJobUpdateStore().fetchJobUpdate(key).get()); |
| return update.getInstanceEvents().stream() |
| .filter(e -> e.getInstanceId() == id) |
| .collect(Collectors.toList()); |
| } |
| |
| private Optional<IJobUpdateDetails> getUpdateDetails(IJobUpdateKey key) { |
| return storage.read(storeProvider -> storeProvider.getJobUpdateStore().fetchJobUpdate(key)); |
| } |
| |
| private List<IJobUpdateDetails> fetchUpdates(JobUpdateQuery query) { |
| return storage.read(storeProvider -> |
| storeProvider.getJobUpdateStore().fetchJobUpdates(IJobUpdateQuery.build(query))); |
| } |
| |
| protected void saveUpdate(IJobUpdateDetails update) { |
| storage.write((NoResult.Quiet) storeProvider -> { |
| JobUpdateStore.Mutable store = storeProvider.getJobUpdateStore(); |
| store.saveJobUpdate(update.getUpdate()); |
| IJobUpdateKey key = update.getUpdate().getSummary().getKey(); |
| update.getUpdateEvents().forEach(event -> { |
| store.saveJobUpdateEvent(key, event); |
| }); |
| update.getInstanceEvents().forEach(event -> { |
| store.saveJobInstanceUpdateEvent(key, event); |
| }); |
| }); |
| } |
| |
| private void saveJobEvent(IJobUpdateEvent event, IJobUpdateKey key) { |
| storage.write((NoResult.Quiet) |
| storeProvider -> storeProvider.getJobUpdateStore().saveJobUpdateEvent(key, event)); |
| } |
| |
| private void saveJobInstanceEvent(IJobInstanceUpdateEvent event, IJobUpdateKey key) { |
| storage.write((NoResult.Quiet) |
| storeProvider -> storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(key, event)); |
| } |
| |
| protected void truncateUpdates() { |
| storage.write((NoResult.Quiet) |
| storeProvider -> storeProvider.getJobUpdateStore().deleteAllUpdates()); |
| } |
| |
| private IJobUpdateDetails withDefaultUpdateState(IJobUpdateDetails update) { |
| return withUpdateState(update, ROLLING_FORWARD, CREATED_MS, CREATED_MS); |
| } |
| |
| private IJobUpdateDetails withUpdateState( |
| IJobUpdateDetails update, |
| JobUpdateStatus status, |
| long createdMs, |
| long lastMs) { |
| |
| JobUpdateState state = new JobUpdateState() |
| .setCreatedTimestampMs(createdMs) |
| .setLastModifiedTimestampMs(lastMs) |
| .setStatus(status); |
| JobUpdateDetails builder = update.newBuilder(); |
| builder.getUpdate().getSummary().setState(state); |
| return IJobUpdateDetails.build(builder); |
| } |
| |
| private static IJobUpdateEvent makeJobUpdateEvent(JobUpdateStatus status, long timestampMs) { |
| return IJobUpdateEvent.build( |
| new JobUpdateEvent(status, timestampMs) |
| .setUser("user") |
| .setMessage("message")); |
| } |
| |
| private IJobInstanceUpdateEvent makeJobInstanceEvent( |
| int instanceId, |
| long timestampMs, |
| JobUpdateAction action) { |
| |
| return IJobInstanceUpdateEvent.build( |
| new JobInstanceUpdateEvent(instanceId, timestampMs, action)); |
| } |
| |
| private IJobUpdateDetails updateJobDetails(IJobUpdateDetails update, IJobUpdateEvent event) { |
| return updateJobDetails( |
| update, |
| ImmutableList.of(event), |
| ImmutableList.of()); |
| } |
| |
| private IJobUpdateDetails updateJobDetails( |
| IJobUpdateDetails update, |
| List<IJobUpdateEvent> jobEvents, |
| List<IJobInstanceUpdateEvent> instanceEvents) { |
| |
| return IJobUpdateDetails.build(update.newBuilder() |
| .setUpdateEvents(IJobUpdateEvent.toBuildersList(jobEvents)) |
| .setInstanceEvents(IJobInstanceUpdateEvent.toBuildersList(instanceEvents))); |
| } |
| |
| private static IJobUpdateSummary makeSummary(IJobUpdateKey key, String user) { |
| return IJobUpdateSummary.build(new JobUpdateSummary() |
| .setKey(key.newBuilder()) |
| .setUser(user) |
| .setMetadata(METADATA)); |
| } |
| |
| protected static IJobUpdateDetails makeJobUpdate(IJobUpdateKey key) { |
| return IJobUpdateDetails.build(new JobUpdateDetails() |
| .setUpdateEvents(ImmutableList.of(FIRST_EVENT.newBuilder())) |
| .setUpdate(new JobUpdate() |
| .setInstructions(makeJobUpdateInstructions().newBuilder()) |
| .setSummary(makeSummary(key, "user").newBuilder()))); |
| } |
| |
| private static IJobUpdateInstructions makeJobUpdateInstructions() { |
| TaskConfig config = TaskTestUtil.makeConfig(JOB).newBuilder(); |
| return IJobUpdateInstructions.build(new JobUpdateInstructions() |
| .setDesiredState(new InstanceTaskConfig() |
| .setTask(config) |
| .setInstances(ImmutableSet.of(new Range(0, 7), new Range(8, 9)))) |
| .setInitialState(ImmutableSet.of( |
| new InstanceTaskConfig() |
| .setInstances(ImmutableSet.of(new Range(0, 1), new Range(2, 3))) |
| .setTask(config), |
| new InstanceTaskConfig() |
| .setInstances(ImmutableSet.of(new Range(4, 5), new Range(6, 7))) |
| .setTask(config))) |
| .setSettings(new JobUpdateSettings() |
| .setBlockIfNoPulsesAfterMs(500) |
| .setUpdateStrategy( |
| JobUpdateStrategy.batchStrategy(new BatchJobUpdateStrategy().setGroupSize(1))) |
| .setUpdateGroupSize(1) // TODO(rdelvalle): Remove when thrift field deprecated. |
| .setMaxPerInstanceFailures(1) |
| .setMaxFailedInstances(1) |
| .setMinWaitInInstanceRunningMs(200) |
| .setRollbackOnFailure(true) |
| .setSlaAware(true) |
| .setWaitForBatchCompletion(true) |
| .setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(0, 0), new Range(3, 5))))); |
| } |
| } |