| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.overlord; |
| |
| import com.fasterxml.jackson.annotation.JsonCreator; |
| import com.fasterxml.jackson.annotation.JsonIgnore; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.jsontype.NamedType; |
| import com.fasterxml.jackson.databind.module.SimpleModule; |
| import com.google.common.collect.Iterables; |
| import org.apache.druid.indexer.TaskStatus; |
| import org.apache.druid.indexing.common.LockGranularity; |
| import org.apache.druid.indexing.common.SegmentLock; |
| import org.apache.druid.indexing.common.TaskLock; |
| import org.apache.druid.indexing.common.TaskLockType; |
| import org.apache.druid.indexing.common.TaskToolbox; |
| import org.apache.druid.indexing.common.TimeChunkLock; |
| import org.apache.druid.indexing.common.actions.TaskActionClient; |
| import org.apache.druid.indexing.common.config.TaskConfig; |
| import org.apache.druid.indexing.common.config.TaskStorageConfig; |
| import org.apache.druid.indexing.common.task.AbstractTask; |
| import org.apache.druid.indexing.common.task.NoopTask; |
| import org.apache.druid.indexing.common.task.Task; |
| import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse; |
| import org.apache.druid.jackson.DefaultObjectMapper; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; |
| import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; |
| import org.apache.druid.metadata.EntryExistsException; |
| import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; |
| import org.apache.druid.metadata.MetadataStorageTablesConfig; |
| import org.apache.druid.metadata.TestDerbyConnector; |
| import org.apache.druid.segment.TestHelper; |
| import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; |
| import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec; |
| import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; |
| import org.apache.druid.timeline.partition.NumberedOverwritePartialShardSpec; |
| import org.apache.druid.timeline.partition.NumberedPartialShardSpec; |
| import org.apache.druid.timeline.partition.NumberedShardSpec; |
| import org.apache.druid.timeline.partition.PartialShardSpec; |
| import org.apache.druid.timeline.partition.PartitionIds; |
| import org.easymock.EasyMock; |
| import org.joda.time.Interval; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| public class TaskLockboxTest |
| { |
| @Rule |
| public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule(); |
| |
| @Rule |
| public ExpectedException expectedException = ExpectedException.none(); |
| |
| private ObjectMapper objectMapper; |
| private TaskStorage taskStorage; |
| private IndexerMetadataStorageCoordinator metadataStorageCoordinator; |
| private TaskLockbox lockbox; |
| |
| @Rule |
| public final ExpectedException exception = ExpectedException.none(); |
| |
| @Before |
| public void setup() |
| { |
| objectMapper = TestHelper.makeJsonMapper(); |
| objectMapper.registerSubtypes(NumberedShardSpec.class, HashBasedNumberedShardSpec.class); |
| |
| final TestDerbyConnector derbyConnector = derby.getConnector(); |
| derbyConnector.createTaskTables(); |
| derbyConnector.createPendingSegmentsTable(); |
| derbyConnector.createSegmentTable(); |
| final MetadataStorageTablesConfig tablesConfig = derby.metadataTablesConfigSupplier().get(); |
| taskStorage = new MetadataTaskStorage( |
| derbyConnector, |
| new TaskStorageConfig(null), |
| new DerbyMetadataStorageActionHandlerFactory( |
| derbyConnector, |
| tablesConfig, |
| objectMapper |
| ) |
| ); |
| ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); |
| EmittingLogger.registerEmitter(emitter); |
| EasyMock.replay(emitter); |
| |
| metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(objectMapper, tablesConfig, derbyConnector); |
| |
| lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); |
| } |
| |
| private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interval interval, long timeoutMs) |
| throws InterruptedException |
| { |
| return lockbox.lock(task, new TimeChunkLockRequest(lockType, task, interval, null), timeoutMs); |
| } |
| |
| private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interval interval) |
| throws InterruptedException |
| { |
| return lockbox.lock(task, new TimeChunkLockRequest(lockType, task, interval, null)); |
| } |
| |
| private LockResult tryTimeChunkLock(TaskLockType lockType, Task task, Interval interval) |
| { |
| return lockbox.tryLock(task, new TimeChunkLockRequest(lockType, task, interval, null)); |
| } |
| |
| @Test |
| public void testLock() throws InterruptedException |
| { |
| Task task = NoopTask.create(); |
| lockbox.add(task); |
| Assert.assertNotNull(acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02"))); |
| } |
| |
| @Test(expected = IllegalStateException.class) |
| public void testLockForInactiveTask() throws InterruptedException |
| { |
| acquireTimeChunkLock(TaskLockType.EXCLUSIVE, NoopTask.create(), Intervals.of("2015-01-01/2015-01-02")); |
| } |
| |
| @Test |
| public void testLockAfterTaskComplete() throws InterruptedException |
| { |
| Task task = NoopTask.create(); |
| exception.expect(ISE.class); |
| exception.expectMessage("Unable to grant lock to inactive Task"); |
| lockbox.add(task); |
| lockbox.remove(task); |
| acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02")); |
| } |
| |
| @Test |
| public void testTrySharedLock() |
| { |
| final Interval interval = Intervals.of("2017-01/2017-02"); |
| final List<Task> tasks = new ArrayList<>(); |
| final Set<TaskLock> actualLocks = new HashSet<>(); |
| |
| // test creating new locks |
| for (int i = 0; i < 5; i++) { |
| final Task task = NoopTask.create(Math.min(0, (i - 1) * 10)); // the first two tasks have the same priority |
| tasks.add(task); |
| lockbox.add(task); |
| final TaskLock lock = tryTimeChunkLock(TaskLockType.SHARED, task, interval).getTaskLock(); |
| Assert.assertNotNull(lock); |
| actualLocks.add(lock); |
| } |
| |
| Assert.assertEquals(5, getAllLocks(tasks).size()); |
| Assert.assertEquals(getAllLocks(tasks), actualLocks); |
| } |
| |
| @Test |
| public void testTryMixedLocks() throws EntryExistsException |
| { |
| final Task lowPriorityTask = NoopTask.create(0); |
| final Task lowPriorityTask2 = NoopTask.create(0); |
| final Task highPiorityTask = NoopTask.create(10); |
| final Interval interval1 = Intervals.of("2017-01-01/2017-01-02"); |
| final Interval interval2 = Intervals.of("2017-01-02/2017-01-03"); |
| final Interval interval3 = Intervals.of("2017-01-03/2017-01-04"); |
| |
| taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId())); |
| taskStorage.insert(lowPriorityTask2, TaskStatus.running(lowPriorityTask2.getId())); |
| taskStorage.insert(highPiorityTask, TaskStatus.running(highPiorityTask.getId())); |
| |
| lockbox.add(lowPriorityTask); |
| lockbox.add(lowPriorityTask2); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval1).isOk()); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.SHARED, lowPriorityTask, interval2).isOk()); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.SHARED, lowPriorityTask2, interval2).isOk()); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval3).isOk()); |
| |
| lockbox.add(highPiorityTask); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.SHARED, highPiorityTask, interval1).isOk()); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.EXCLUSIVE, highPiorityTask, interval2).isOk()); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.EXCLUSIVE, highPiorityTask, interval3).isOk()); |
| |
| Assert.assertTrue(lockbox.findLocksForTask(lowPriorityTask).stream().allMatch(TaskLock::isRevoked)); |
| Assert.assertTrue(lockbox.findLocksForTask(lowPriorityTask2).stream().allMatch(TaskLock::isRevoked)); |
| |
| lockbox.remove(lowPriorityTask); |
| lockbox.remove(lowPriorityTask2); |
| lockbox.remove(highPiorityTask); |
| |
| lockbox.add(highPiorityTask); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.EXCLUSIVE, highPiorityTask, interval1).isOk()); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.SHARED, highPiorityTask, interval2).isOk()); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.EXCLUSIVE, highPiorityTask, interval3).isOk()); |
| |
| lockbox.add(lowPriorityTask); |
| Assert.assertFalse(tryTimeChunkLock(TaskLockType.SHARED, lowPriorityTask, interval1).isOk()); |
| Assert.assertFalse(tryTimeChunkLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval2).isOk()); |
| Assert.assertFalse(tryTimeChunkLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval3).isOk()); |
| } |
| |
| @Test |
| public void testTryExclusiveLock() |
| { |
| Task task = NoopTask.create(); |
| lockbox.add(task); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-03")).isOk()); |
| |
| // try to take lock for task 2 for overlapping interval |
| Task task2 = NoopTask.create(); |
| lockbox.add(task2); |
| Assert.assertFalse(tryTimeChunkLock(TaskLockType.EXCLUSIVE, task2, Intervals.of("2015-01-01/2015-01-02")).isOk()); |
| |
| // task 1 unlocks the lock |
| lockbox.remove(task); |
| |
| // Now task2 should be able to get the lock |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.EXCLUSIVE, task2, Intervals.of("2015-01-01/2015-01-02")).isOk()); |
| } |
| |
| @Test(expected = IllegalStateException.class) |
| public void testTryLockForInactiveTask() |
| { |
| Assert.assertFalse(tryTimeChunkLock(TaskLockType.EXCLUSIVE, NoopTask.create(), Intervals.of("2015-01-01/2015-01-02")).isOk()); |
| } |
| |
| @Test |
| public void testTryLockAfterTaskComplete() |
| { |
| Task task = NoopTask.create(); |
| exception.expect(ISE.class); |
| exception.expectMessage("Unable to grant lock to inactive Task"); |
| lockbox.add(task); |
| lockbox.remove(task); |
| Assert.assertFalse(tryTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02")).isOk()); |
| } |
| |
| @Test |
| public void testTimeoutForLock() throws InterruptedException |
| { |
| Task task1 = NoopTask.create(); |
| Task task2 = NoopTask.create(); |
| |
| lockbox.add(task1); |
| lockbox.add(task2); |
| Assert.assertTrue(acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task1, Intervals.of("2015-01-01/2015-01-02"), 5000).isOk()); |
| Assert.assertFalse(acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task2, Intervals.of("2015-01-01/2015-01-15"), 1000).isOk()); |
| } |
| |
| @Test |
| public void testSyncFromStorage() throws EntryExistsException |
| { |
| final TaskLockbox originalBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); |
| for (int i = 0; i < 5; i++) { |
| final Task task = NoopTask.create(); |
| taskStorage.insert(task, TaskStatus.running(task.getId())); |
| originalBox.add(task); |
| Assert.assertTrue( |
| originalBox.tryLock( |
| task, |
| new TimeChunkLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task, |
| Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2))), |
| null |
| ) |
| ).isOk() |
| ); |
| } |
| |
| final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream() |
| .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) |
| .collect(Collectors.toList()); |
| |
| final TaskLockbox newBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); |
| newBox.syncFromStorage(); |
| |
| Assert.assertEquals(originalBox.getAllLocks(), newBox.getAllLocks()); |
| Assert.assertEquals(originalBox.getActiveTasks(), newBox.getActiveTasks()); |
| |
| final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream() |
| .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) |
| .collect(Collectors.toList()); |
| |
| Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); |
| } |
| |
| @Test |
| public void testSyncFromStorageWithMissingTaskLockPriority() throws EntryExistsException |
| { |
| final Task task = NoopTask.create(); |
| taskStorage.insert(task, TaskStatus.running(task.getId())); |
| taskStorage.addLock( |
| task.getId(), |
| new IntervalLockWithoutPriority(task.getGroupId(), task.getDataSource(), Intervals.of("2017/2018"), "v1") |
| ); |
| |
| final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream() |
| .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) |
| .collect(Collectors.toList()); |
| |
| final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); |
| lockbox.syncFromStorage(); |
| |
| final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream() |
| .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) |
| .collect(Collectors.toList()); |
| |
| Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); |
| } |
| |
| @Test |
| public void testSyncFromStorageWithMissingTaskPriority() throws EntryExistsException |
| { |
| final Task task = NoopTask.create(); |
| taskStorage.insert(task, TaskStatus.running(task.getId())); |
| taskStorage.addLock( |
| task.getId(), |
| new TimeChunkLock( |
| TaskLockType.EXCLUSIVE, |
| task.getGroupId(), |
| task.getDataSource(), |
| Intervals.of("2017/2018"), |
| "v1", |
| task.getPriority() |
| ) |
| ); |
| |
| final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream() |
| .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) |
| .collect(Collectors.toList()); |
| |
| final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); |
| lockbox.syncFromStorage(); |
| |
| final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream() |
| .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) |
| .collect(Collectors.toList()); |
| |
| Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); |
| } |
| |
| @Test |
| public void testSyncFromStorageWithInvalidPriority() throws EntryExistsException |
| { |
| final Task task = NoopTask.create(); |
| taskStorage.insert(task, TaskStatus.running(task.getId())); |
| taskStorage.addLock( |
| task.getId(), |
| new TimeChunkLock( |
| TaskLockType.EXCLUSIVE, |
| task.getGroupId(), |
| task.getDataSource(), |
| Intervals.of("2017/2018"), |
| "v1", |
| 10 |
| ) |
| ); |
| |
| final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); |
| expectedException.expect(IllegalArgumentException.class); |
| expectedException.expectMessage("lock priority[10] is different from task priority[50]"); |
| lockbox.syncFromStorage(); |
| } |
| |
| @Test |
| public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() throws Exception |
| { |
| // ensure that if we don't know how to deserialize a task it won't explode the lockbox |
| // (or anything else that uses taskStorage.getActiveTasks() and doesn't expect null which is most things) |
| final TestDerbyConnector derbyConnector = derby.getConnector(); |
| ObjectMapper loadedMapper = new DefaultObjectMapper().registerModule(new TheModule()); |
| TaskStorage loadedTaskStorage = new MetadataTaskStorage( |
| derbyConnector, |
| new TaskStorageConfig(null), |
| new DerbyMetadataStorageActionHandlerFactory( |
| derbyConnector, |
| derby.metadataTablesConfigSupplier().get(), |
| loadedMapper |
| ) |
| ); |
| IndexerMetadataStorageCoordinator loadedMetadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator( |
| loadedMapper, |
| derby.metadataTablesConfigSupplier().get(), |
| derbyConnector |
| ); |
| |
| TaskLockbox theBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); |
| TaskLockbox loadedBox = new TaskLockbox(loadedTaskStorage, loadedMetadataStorageCoordinator); |
| |
| Task aTask = NoopTask.create(); |
| taskStorage.insert(aTask, TaskStatus.running(aTask.getId())); |
| theBox.add(aTask); |
| loadedBox.add(aTask); |
| |
| Task theTask = new MyModuleIsntLoadedTask("1", "yey", null, "foo"); |
| loadedTaskStorage.insert(theTask, TaskStatus.running(theTask.getId())); |
| theBox.add(theTask); |
| loadedBox.add(theTask); |
| |
| List<Task> tasks = taskStorage.getActiveTasks(); |
| List<Task> tasksFromLoaded = loadedTaskStorage.getActiveTasks(); |
| |
| theBox.syncFromStorage(); |
| loadedBox.syncFromStorage(); |
| |
| Assert.assertEquals(1, tasks.size()); |
| Assert.assertEquals(2, tasksFromLoaded.size()); |
| } |
| |
| @Test |
| public void testRevokedLockSyncFromStorage() throws EntryExistsException |
| { |
| final TaskLockbox originalBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); |
| |
| final Task task1 = NoopTask.create("task1", 10); |
| taskStorage.insert(task1, TaskStatus.running(task1.getId())); |
| originalBox.add(task1); |
| Assert.assertTrue(originalBox.tryLock(task1, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task1, Intervals.of("2017/2018"), null)).isOk()); |
| |
| // task2 revokes task1 |
| final Task task2 = NoopTask.create("task2", 100); |
| taskStorage.insert(task2, TaskStatus.running(task2.getId())); |
| originalBox.add(task2); |
| Assert.assertTrue(originalBox.tryLock(task2, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task2, Intervals.of("2017/2018"), null)).isOk()); |
| |
| final Map<String, List<TaskLock>> beforeLocksInStorage = taskStorage |
| .getActiveTasks() |
| .stream() |
| .collect(Collectors.toMap(Task::getId, task -> taskStorage.getLocks(task.getId()))); |
| |
| final List<TaskLock> task1Locks = beforeLocksInStorage.get("task1"); |
| Assert.assertEquals(1, task1Locks.size()); |
| Assert.assertTrue(task1Locks.get(0).isRevoked()); |
| |
| final List<TaskLock> task2Locks = beforeLocksInStorage.get("task1"); |
| Assert.assertEquals(1, task2Locks.size()); |
| Assert.assertTrue(task2Locks.get(0).isRevoked()); |
| |
| final TaskLockbox newBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); |
| newBox.syncFromStorage(); |
| |
| final Set<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream() |
| .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) |
| .collect(Collectors.toSet()); |
| |
| Assert.assertEquals( |
| beforeLocksInStorage.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()), |
| afterLocksInStorage |
| ); |
| } |
| |
| @Test |
| public void testDoInCriticalSectionWithSharedLock() throws Exception |
| { |
| final Interval interval = Intervals.of("2017-01-01/2017-01-02"); |
| final Task task = NoopTask.create(); |
| lockbox.add(task); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.SHARED, task, interval).isOk()); |
| |
| Assert.assertFalse( |
| lockbox.doInCriticalSection( |
| task, |
| Collections.singletonList(interval), |
| CriticalAction.<Boolean>builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build() |
| ) |
| ); |
| } |
| |
| @Test |
| public void testDoInCriticalSectionWithExclusiveLock() throws Exception |
| { |
| final Interval interval = Intervals.of("2017-01-01/2017-01-02"); |
| final Task task = NoopTask.create(); |
| lockbox.add(task); |
| final TaskLock lock = tryTimeChunkLock(TaskLockType.EXCLUSIVE, task, interval).getTaskLock(); |
| Assert.assertNotNull(lock); |
| |
| Assert.assertTrue( |
| lockbox.doInCriticalSection( |
| task, |
| Collections.singletonList(interval), |
| CriticalAction.<Boolean>builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build() |
| ) |
| ); |
| } |
| |
| @Test |
| public void testDoInCriticalSectionWithSmallerInterval() throws Exception |
| { |
| final Interval interval = Intervals.of("2017-01-01/2017-02-01"); |
| final Interval smallInterval = Intervals.of("2017-01-10/2017-01-11"); |
| final Task task = NoopTask.create(); |
| lockbox.add(task); |
| final TaskLock lock = tryTimeChunkLock(TaskLockType.EXCLUSIVE, task, interval).getTaskLock(); |
| Assert.assertNotNull(lock); |
| |
| Assert.assertTrue( |
| lockbox.doInCriticalSection( |
| task, |
| Collections.singletonList(interval), |
| CriticalAction.<Boolean>builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build() |
| ) |
| ); |
| } |
| |
| @Test |
| public void testPreemptionAndDoInCriticalSection() throws Exception |
| { |
| final Interval interval = Intervals.of("2017-01-01/2017-01-02"); |
| for (int i = 0; i < 5; i++) { |
| final Task task = NoopTask.create(); |
| lockbox.add(task); |
| taskStorage.insert(task, TaskStatus.running(task.getId())); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.SHARED, task, interval).isOk()); |
| } |
| |
| final Task highPriorityTask = NoopTask.create(100); |
| lockbox.add(highPriorityTask); |
| taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId())); |
| final TaskLock lock = tryTimeChunkLock(TaskLockType.EXCLUSIVE, highPriorityTask, interval).getTaskLock(); |
| Assert.assertNotNull(lock); |
| |
| Assert.assertTrue( |
| lockbox.doInCriticalSection( |
| highPriorityTask, |
| Collections.singletonList(interval), |
| CriticalAction.<Boolean>builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build() |
| ) |
| ); |
| } |
| |
| @Test |
| public void testDoInCriticalSectionWithRevokedLock() throws Exception |
| { |
| final Interval interval = Intervals.of("2017-01-01/2017-01-02"); |
| final Task lowPriorityTask = NoopTask.create("task1", 0); |
| final Task highPriorityTask = NoopTask.create("task2", 10); |
| lockbox.add(lowPriorityTask); |
| lockbox.add(highPriorityTask); |
| taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId())); |
| taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId())); |
| |
| final TaskLock lowPriorityLock = tryTimeChunkLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval).getTaskLock(); |
| Assert.assertNotNull(lowPriorityLock); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.EXCLUSIVE, highPriorityTask, interval).isOk()); |
| Assert.assertTrue(Iterables.getOnlyElement(lockbox.findLocksForTask(lowPriorityTask)).isRevoked()); |
| |
| Assert.assertFalse( |
| lockbox.doInCriticalSection( |
| lowPriorityTask, |
| Collections.singletonList(interval), |
| CriticalAction.<Boolean>builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build() |
| ) |
| ); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testAcquireLockAfterRevoked() throws EntryExistsException, InterruptedException |
| { |
| final Interval interval = Intervals.of("2017-01-01/2017-01-02"); |
| final Task lowPriorityTask = NoopTask.create("task1", 0); |
| final Task highPriorityTask = NoopTask.create("task2", 10); |
| lockbox.add(lowPriorityTask); |
| lockbox.add(highPriorityTask); |
| taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId())); |
| taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId())); |
| |
| final TaskLock lowPriorityLock = acquireTimeChunkLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval).getTaskLock(); |
| Assert.assertNotNull(lowPriorityLock); |
| Assert.assertTrue(tryTimeChunkLock(TaskLockType.EXCLUSIVE, highPriorityTask, interval).isOk()); |
| Assert.assertTrue(Iterables.getOnlyElement(lockbox.findLocksForTask(lowPriorityTask)).isRevoked()); |
| |
| lockbox.unlock(highPriorityTask, interval); |
| |
| // Acquire again |
| final LockResult lockResult = acquireTimeChunkLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval); |
| Assert.assertFalse(lockResult.isOk()); |
| Assert.assertTrue(lockResult.isRevoked()); |
| Assert.assertTrue(Iterables.getOnlyElement(lockbox.findLocksForTask(lowPriorityTask)).isRevoked()); |
| } |
| |
| @Test |
| public void testUnlock() throws EntryExistsException |
| { |
| final List<Task> lowPriorityTasks = new ArrayList<>(); |
| final List<Task> highPriorityTasks = new ArrayList<>(); |
| |
| for (int i = 0; i < 8; i++) { |
| final Task task = NoopTask.create(10); |
| lowPriorityTasks.add(task); |
| taskStorage.insert(task, TaskStatus.running(task.getId())); |
| lockbox.add(task); |
| Assert.assertTrue( |
| tryTimeChunkLock( |
| TaskLockType.EXCLUSIVE, |
| task, |
| Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2))) |
| ).isOk() |
| ); |
| } |
| |
| // Revoke some locks |
| for (int i = 0; i < 4; i++) { |
| final Task task = NoopTask.create(100); |
| highPriorityTasks.add(task); |
| taskStorage.insert(task, TaskStatus.running(task.getId())); |
| lockbox.add(task); |
| Assert.assertTrue( |
| tryTimeChunkLock( |
| TaskLockType.EXCLUSIVE, |
| task, |
| Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2))) |
| ).isOk() |
| ); |
| } |
| |
| for (int i = 0; i < 4; i++) { |
| Assert.assertTrue(taskStorage.getLocks(lowPriorityTasks.get(i).getId()).stream().allMatch(TaskLock::isRevoked)); |
| Assert.assertFalse(taskStorage.getLocks(highPriorityTasks.get(i).getId()).stream().allMatch(TaskLock::isRevoked)); |
| } |
| |
| for (int i = 4; i < 8; i++) { |
| Assert.assertFalse(taskStorage.getLocks(lowPriorityTasks.get(i).getId()).stream().allMatch(TaskLock::isRevoked)); |
| } |
| |
| for (int i = 0; i < 4; i++) { |
| lockbox.unlock( |
| lowPriorityTasks.get(i), |
| Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2))) |
| ); |
| lockbox.unlock( |
| highPriorityTasks.get(i), |
| Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2))) |
| ); |
| } |
| |
| for (int i = 4; i < 8; i++) { |
| lockbox.unlock( |
| lowPriorityTasks.get(i), |
| Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2))) |
| ); |
| } |
| |
| Assert.assertTrue(lockbox.getAllLocks().isEmpty()); |
| } |
| |
| @Test |
| public void testFindLockPosseAfterRevokeWithDifferentLockIntervals() throws EntryExistsException |
| { |
| final Task lowPriorityTask = NoopTask.create(0); |
| final Task highPriorityTask = NoopTask.create(10); |
| |
| taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId())); |
| taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId())); |
| lockbox.add(lowPriorityTask); |
| lockbox.add(highPriorityTask); |
| |
| Assert.assertTrue( |
| tryTimeChunkLock( |
| TaskLockType.EXCLUSIVE, |
| lowPriorityTask, |
| Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00") |
| ).isOk() |
| ); |
| |
| Assert.assertTrue( |
| tryTimeChunkLock( |
| TaskLockType.EXCLUSIVE, |
| highPriorityTask, |
| Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00") |
| ).isOk() |
| ); |
| |
| final List<TaskLockPosse> highLockPosses = lockbox.getOnlyTaskLockPosseContainingInterval( |
| highPriorityTask, |
| Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00") |
| ); |
| |
| Assert.assertEquals(1, highLockPosses.size()); |
| Assert.assertTrue(highLockPosses.get(0).containsTask(highPriorityTask)); |
| Assert.assertFalse(highLockPosses.get(0).getTaskLock().isRevoked()); |
| |
| final List<TaskLockPosse> lowLockPosses = lockbox.getOnlyTaskLockPosseContainingInterval( |
| lowPriorityTask, |
| Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00") |
| ); |
| |
| Assert.assertEquals(1, lowLockPosses.size()); |
| Assert.assertTrue(lowLockPosses.get(0).containsTask(lowPriorityTask)); |
| Assert.assertTrue(lowLockPosses.get(0).getTaskLock().isRevoked()); |
| } |
| |
| @Test |
| public void testSegmentLock() throws InterruptedException |
| { |
| final Task task = NoopTask.create(); |
| lockbox.add(task); |
| final LockResult lockResult = lockbox.lock( |
| task, |
| new SpecificSegmentLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task, |
| Intervals.of("2015-01-01/2015-01-02"), |
| "v1", |
| 3 |
| ) |
| ); |
| Assert.assertTrue(lockResult.isOk()); |
| Assert.assertNull(lockResult.getNewSegmentId()); |
| Assert.assertTrue(lockResult.getTaskLock() instanceof SegmentLock); |
| final SegmentLock segmentLock = (SegmentLock) lockResult.getTaskLock(); |
| Assert.assertEquals(TaskLockType.EXCLUSIVE, segmentLock.getType()); |
| Assert.assertEquals(task.getGroupId(), segmentLock.getGroupId()); |
| Assert.assertEquals(task.getDataSource(), segmentLock.getDataSource()); |
| Assert.assertEquals(Intervals.of("2015-01-01/2015-01-02"), segmentLock.getInterval()); |
| Assert.assertEquals("v1", segmentLock.getVersion()); |
| Assert.assertEquals(3, segmentLock.getPartitionId()); |
| Assert.assertEquals(task.getPriority(), segmentLock.getPriority().intValue()); |
| Assert.assertFalse(segmentLock.isRevoked()); |
| } |
| |
| @Test |
| public void testSegmentAndTimeChunkLockForSameInterval() |
| { |
| final Task task1 = NoopTask.create(); |
| lockbox.add(task1); |
| |
| final Task task2 = NoopTask.create(); |
| lockbox.add(task2); |
| |
| Assert.assertTrue( |
| lockbox.tryLock( |
| task1, |
| new SpecificSegmentLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task1, |
| Intervals.of("2015-01-01/2015-01-02"), |
| "v1", |
| 3 |
| ) |
| ).isOk() |
| ); |
| |
| Assert.assertFalse( |
| lockbox.tryLock( |
| task2, |
| new TimeChunkLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task2, |
| Intervals.of("2015-01-01/2015-01-02"), |
| "v1" |
| ) |
| ).isOk() |
| ); |
| } |
| |
| @Test |
| public void testSegmentAndTimeChunkLockForSameIntervalWithDifferentPriority() throws EntryExistsException |
| { |
| final Task task1 = NoopTask.create(10); |
| lockbox.add(task1); |
| taskStorage.insert(task1, TaskStatus.running(task1.getId())); |
| |
| final Task task2 = NoopTask.create(100); |
| lockbox.add(task2); |
| taskStorage.insert(task2, TaskStatus.running(task2.getId())); |
| |
| Assert.assertTrue( |
| lockbox.tryLock( |
| task1, |
| new SpecificSegmentLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task1, |
| Intervals.of("2015-01-01/2015-01-02"), |
| "v1", |
| 3 |
| ) |
| ).isOk() |
| ); |
| |
| Assert.assertTrue( |
| lockbox.tryLock( |
| task2, |
| new TimeChunkLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task2, |
| Intervals.of("2015-01-01/2015-01-02"), |
| "v1" |
| ) |
| ).isOk() |
| ); |
| |
| final LockResult resultOfTask1 = lockbox.tryLock( |
| task1, |
| new SpecificSegmentLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task1, |
| Intervals.of("2015-01-01/2015-01-02"), |
| "v1", |
| 3 |
| ) |
| ); |
| Assert.assertFalse(resultOfTask1.isOk()); |
| Assert.assertTrue(resultOfTask1.isRevoked()); |
| } |
| |
| @Test |
| public void testSegmentLockForSameIntervalAndSamePartition() |
| { |
| final Task task1 = NoopTask.create(); |
| lockbox.add(task1); |
| |
| final Task task2 = NoopTask.create(); |
| lockbox.add(task2); |
| |
| Assert.assertTrue( |
| lockbox.tryLock( |
| task1, |
| new SpecificSegmentLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task1, |
| Intervals.of("2015-01-01/2015-01-02"), |
| "v1", |
| 3 |
| ) |
| ).isOk() |
| ); |
| |
| Assert.assertFalse( |
| lockbox.tryLock( |
| task2, |
| new SpecificSegmentLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task2, |
| Intervals.of("2015-01-01/2015-01-02"), |
| "v1", |
| 3 |
| ) |
| ).isOk() |
| ); |
| } |
| |
| @Test |
| public void testSegmentLockForSameIntervalDifferentPartition() |
| { |
| final Task task1 = NoopTask.create(); |
| lockbox.add(task1); |
| |
| final Task task2 = NoopTask.create(); |
| lockbox.add(task2); |
| |
| Assert.assertTrue( |
| lockbox.tryLock( |
| task1, |
| new SpecificSegmentLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task1, |
| Intervals.of("2015-01-01/2015-01-02"), |
| "v1", |
| 3 |
| ) |
| ).isOk() |
| ); |
| |
| Assert.assertTrue( |
| lockbox.tryLock( |
| task2, |
| new SpecificSegmentLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task2, |
| Intervals.of("2015-01-01/2015-01-02"), |
| "v1", |
| 2 |
| ) |
| ).isOk() |
| ); |
| } |
| |
| @Test |
| public void testSegmentLockForOverlappedIntervalDifferentPartition() |
| { |
| final Task task1 = NoopTask.create(); |
| lockbox.add(task1); |
| |
| final Task task2 = NoopTask.create(); |
| lockbox.add(task2); |
| |
| Assert.assertTrue( |
| lockbox.tryLock( |
| task1, |
| new SpecificSegmentLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task1, |
| Intervals.of("2015-01-01/2015-01-05"), |
| "v1", |
| 3 |
| ) |
| ).isOk() |
| ); |
| |
| Assert.assertFalse( |
| lockbox.tryLock( |
| task2, |
| new SpecificSegmentLockRequest( |
| TaskLockType.EXCLUSIVE, |
| task2, |
| Intervals.of("2015-01-03/2015-01-08"), |
| "v1", |
| 2 |
| ) |
| ).isOk() |
| ); |
| } |
| |
| @Test |
| public void testRequestForNewSegmentWithSegmentLock() |
| { |
| final Task task = NoopTask.create(); |
| lockbox.add(task); |
| allocateSegmentsAndAssert(task, "seq", 3, NumberedPartialShardSpec.instance()); |
| allocateSegmentsAndAssert(task, "seq2", 2, new NumberedOverwritePartialShardSpec(0, 3, (short) 1)); |
| |
| final List<TaskLock> locks = lockbox.findLocksForTask(task); |
| Assert.assertEquals(5, locks.size()); |
| int expectedPartitionId = 0; |
| for (TaskLock lock : locks) { |
| Assert.assertTrue(lock instanceof SegmentLock); |
| final SegmentLock segmentLock = (SegmentLock) lock; |
| Assert.assertEquals(expectedPartitionId++, segmentLock.getPartitionId()); |
| if (expectedPartitionId == 3) { |
| expectedPartitionId = PartitionIds.NON_ROOT_GEN_START_PARTITION_ID; |
| } |
| } |
| } |
| |
| @Test |
| public void testRequestForNewSegmentWithHashPartition() |
| { |
| final Task task = NoopTask.create(); |
| lockbox.add(task); |
| |
| allocateSegmentsAndAssert(task, "seq", 3, new HashBasedNumberedPartialShardSpec(null, 1, 3, null)); |
| allocateSegmentsAndAssert(task, "seq2", 5, new HashBasedNumberedPartialShardSpec(null, 3, 5, null)); |
| } |
| |
| private void allocateSegmentsAndAssert( |
| Task task, |
| String baseSequenceName, |
| int numSegmentsToAllocate, |
| PartialShardSpec partialShardSpec |
| ) |
| { |
| for (int i = 0; i < numSegmentsToAllocate; i++) { |
| final LockRequestForNewSegment request = new LockRequestForNewSegment( |
| LockGranularity.SEGMENT, |
| TaskLockType.EXCLUSIVE, |
| task, |
| Intervals.of("2015-01-01/2015-01-05"), |
| partialShardSpec, |
| StringUtils.format("%s_%d", baseSequenceName, i), |
| null, |
| true |
| ); |
| assertAllocatedSegments(request, lockbox.tryLock(task, request)); |
| } |
| } |
| |
| private void assertAllocatedSegments( |
| LockRequestForNewSegment lockRequest, |
| LockResult result |
| ) |
| { |
| Assert.assertTrue(result.isOk()); |
| Assert.assertNotNull(result.getTaskLock()); |
| Assert.assertTrue(result.getTaskLock() instanceof SegmentLock); |
| Assert.assertNotNull(result.getNewSegmentId()); |
| final SegmentLock segmentLock = (SegmentLock) result.getTaskLock(); |
| final SegmentIdWithShardSpec segmentId = result.getNewSegmentId(); |
| |
| Assert.assertEquals(lockRequest.getType(), segmentLock.getType()); |
| Assert.assertEquals(lockRequest.getGroupId(), segmentLock.getGroupId()); |
| Assert.assertEquals(lockRequest.getDataSource(), segmentLock.getDataSource()); |
| Assert.assertEquals(lockRequest.getInterval(), segmentLock.getInterval()); |
| Assert.assertEquals(lockRequest.getPartialShardSpec().getShardSpecClass(), segmentId.getShardSpec().getClass()); |
| Assert.assertEquals(lockRequest.getPriority(), lockRequest.getPriority()); |
| } |
| |
| @Test |
| public void testLockPosseEquals() |
| { |
| final Task task1 = NoopTask.create(); |
| final Task task2 = NoopTask.create(); |
| |
| TaskLock taskLock1 = new TimeChunkLock( |
| TaskLockType.EXCLUSIVE, |
| task1.getGroupId(), |
| task1.getDataSource(), |
| Intervals.of("2018/2019"), |
| "v1", |
| task1.getPriority() |
| ); |
| |
| TaskLock taskLock2 = new TimeChunkLock( |
| TaskLockType.EXCLUSIVE, |
| task2.getGroupId(), |
| task2.getDataSource(), |
| Intervals.of("2018/2019"), |
| "v2", |
| task2.getPriority() |
| ); |
| |
| TaskLockPosse taskLockPosse1 = new TaskLockPosse(taskLock1); |
| TaskLockPosse taskLockPosse2 = new TaskLockPosse(taskLock2); |
| TaskLockPosse taskLockPosse3 = new TaskLockPosse(taskLock1); |
| |
| Assert.assertNotEquals(taskLockPosse1, null); |
| Assert.assertNotEquals(null, taskLockPosse1); |
| Assert.assertNotEquals(taskLockPosse1, taskLockPosse2); |
| Assert.assertEquals(taskLockPosse1, taskLockPosse3); |
| } |
| |
| @Test |
| public void testGetTimeChunkAndSegmentLockForSameGroup() |
| { |
| final Task task1 = NoopTask.withGroupId("groupId"); |
| final Task task2 = NoopTask.withGroupId("groupId"); |
| |
| lockbox.add(task1); |
| lockbox.add(task2); |
| |
| Assert.assertTrue( |
| lockbox.tryLock( |
| task1, |
| new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task1, Intervals.of("2017/2018"), null) |
| ).isOk() |
| ); |
| |
| Assert.assertTrue( |
| lockbox.tryLock( |
| task2, |
| new SpecificSegmentLockRequest(TaskLockType.EXCLUSIVE, task2, Intervals.of("2017/2018"), "version", 0) |
| ).isOk() |
| ); |
| |
| final List<TaskLockPosse> posses = lockbox |
| .getAllLocks() |
| .get(task1.getDataSource()) |
| .get(DateTimes.of("2017")) |
| .get(Intervals.of("2017/2018")); |
| Assert.assertEquals(2, posses.size()); |
| |
| Assert.assertEquals(LockGranularity.TIME_CHUNK, posses.get(0).getTaskLock().getGranularity()); |
| final TimeChunkLock timeChunkLock = (TimeChunkLock) posses.get(0).getTaskLock(); |
| Assert.assertEquals("none", timeChunkLock.getDataSource()); |
| Assert.assertEquals("groupId", timeChunkLock.getGroupId()); |
| Assert.assertEquals(Intervals.of("2017/2018"), timeChunkLock.getInterval()); |
| |
| Assert.assertEquals(LockGranularity.SEGMENT, posses.get(1).getTaskLock().getGranularity()); |
| final SegmentLock segmentLock = (SegmentLock) posses.get(1).getTaskLock(); |
| Assert.assertEquals("none", segmentLock.getDataSource()); |
| Assert.assertEquals("groupId", segmentLock.getGroupId()); |
| Assert.assertEquals(Intervals.of("2017/2018"), segmentLock.getInterval()); |
| Assert.assertEquals(0, segmentLock.getPartitionId()); |
| } |
| |
| @Test |
| public void testGetTimeChunkAndSegmentLockForDifferentGroup() |
| { |
| final Task task1 = NoopTask.withGroupId("groupId"); |
| final Task task2 = NoopTask.withGroupId("groupId2"); |
| |
| lockbox.add(task1); |
| lockbox.add(task2); |
| |
| Assert.assertTrue( |
| lockbox.tryLock( |
| task1, |
| new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task1, Intervals.of("2017/2018"), null) |
| ).isOk() |
| ); |
| |
| Assert.assertFalse( |
| lockbox.tryLock( |
| task2, |
| new SpecificSegmentLockRequest(TaskLockType.EXCLUSIVE, task2, Intervals.of("2017/2018"), "version", 0) |
| ).isOk() |
| ); |
| } |
| |
| private Set<TaskLock> getAllLocks(List<Task> tasks) |
| { |
| return tasks.stream() |
| .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) |
| .collect(Collectors.toSet()); |
| } |
| |
| private static class IntervalLockWithoutPriority extends TimeChunkLock |
| { |
| @JsonCreator |
| IntervalLockWithoutPriority( |
| String groupId, |
| String dataSource, |
| Interval interval, |
| String version |
| ) |
| { |
| super(null, groupId, dataSource, interval, version, null, false); |
| } |
| |
| @Override |
| @JsonProperty |
| public TaskLockType getType() |
| { |
| return super.getType(); |
| } |
| |
| @Override |
| @JsonProperty |
| public String getGroupId() |
| { |
| return super.getGroupId(); |
| } |
| |
| @Override |
| @JsonProperty |
| public String getDataSource() |
| { |
| return super.getDataSource(); |
| } |
| |
| @Override |
| @JsonProperty |
| public Interval getInterval() |
| { |
| return super.getInterval(); |
| } |
| |
| @Override |
| @JsonProperty |
| public String getVersion() |
| { |
| return super.getVersion(); |
| } |
| |
| @JsonIgnore |
| @Override |
| public Integer getPriority() |
| { |
| return super.getPriority(); |
| } |
| |
| @JsonIgnore |
| @Override |
| public boolean isRevoked() |
| { |
| return super.isRevoked(); |
| } |
| } |
| |
| private static String TASK_NAME = "myModuleIsntLoadedTask"; |
| |
| private static class TheModule extends SimpleModule |
| { |
| public TheModule() |
| { |
| registerSubtypes(new NamedType(MyModuleIsntLoadedTask.class, TASK_NAME)); |
| } |
| } |
| |
| private static class MyModuleIsntLoadedTask extends AbstractTask |
| { |
| private String someProp; |
| |
| @JsonCreator |
| protected MyModuleIsntLoadedTask( |
| @JsonProperty("id") String id, |
| @JsonProperty("dataSource") String dataSource, |
| @JsonProperty("context") Map<String, Object> context, |
| @JsonProperty("someProp") String someProp |
| ) |
| { |
| super(id, dataSource, context); |
| this.someProp = someProp; |
| } |
| |
| @JsonProperty |
| public String getSomeProp() |
| { |
| return someProp; |
| } |
| |
| @Override |
| public String getType() |
| { |
| return TASK_NAME; |
| } |
| |
| @Override |
| public boolean isReady(TaskActionClient taskActionClient) |
| { |
| return true; |
| } |
| |
| @Override |
| public void stopGracefully(TaskConfig taskConfig) |
| { |
| } |
| |
| @Override |
| public TaskStatus run(TaskToolbox toolbox) |
| { |
| return TaskStatus.failure("how?"); |
| } |
| } |
| } |