| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.util.core.task; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import org.apache.brooklyn.api.mgmt.HasTaskChildren; |
| import org.apache.brooklyn.api.mgmt.Task; |
| import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; |
| import org.apache.brooklyn.test.Asserts; |
| import org.apache.brooklyn.util.collections.CollectionFunctionals; |
| import org.apache.brooklyn.util.collections.MutableList; |
| import org.apache.brooklyn.util.collections.MutableMap; |
| import org.apache.brooklyn.util.collections.MutableSet; |
| import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.apache.brooklyn.util.math.MathPredicates; |
| import org.apache.brooklyn.util.time.CountdownTimer; |
| import org.apache.brooklyn.util.time.Duration; |
| import org.apache.brooklyn.util.time.Time; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| import com.google.common.base.Predicate; |
| import com.google.common.base.Predicates; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.base.Supplier; |
| import com.google.common.base.Suppliers; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| |
| public class DynamicSequentialTaskTest { |
| |
| private static final Logger log = LoggerFactory.getLogger(DynamicSequentialTaskTest.class); |
| |
| public static final Duration TIMEOUT = Duration.TEN_SECONDS; |
| public static final Duration TINY_TIME = Duration.millis(20); |
| |
| BasicExecutionManager em; |
| BasicExecutionContext ec; |
| List<String> messages; |
| Semaphore cancellations; |
| Stopwatch stopwatch; |
| Map<String,Semaphore> monitorableJobSemaphoreMap; |
| Map<String,Task<String>> monitorableTasksMap; |
| |
| @BeforeMethod(alwaysRun=true) |
| public void setUp() { |
| em = new BasicExecutionManager("mycontext"); |
| ec = new BasicExecutionContext(em); |
| cancellations = new Semaphore(0); |
| messages = new ArrayList<String>(); |
| monitorableJobSemaphoreMap = MutableMap.of(); |
| monitorableTasksMap = MutableMap.of(); |
| monitorableTasksMap.clear(); |
| stopwatch = Stopwatch.createStarted(); |
| } |
| |
| @AfterMethod(alwaysRun=true) |
| public void tearDown() throws Exception { |
| if (em != null) em.shutdownNow(); |
| } |
| |
| @Test |
| public void testSimple() throws Exception { |
| Callable<String> mainJob = new Callable<String>() { |
| public String call() { |
| log.info("main job - "+Tasks.current()); |
| messages.add("main"); |
| DynamicTasks.queue( sayTask("world") ); |
| return "bye"; |
| } |
| }; |
| DynamicSequentialTask<String> t = new DynamicSequentialTask<String>(mainJob); |
| // this should be added before anything added when the task is invoked |
| t.queue(sayTask("hello")); |
| |
| Assert.assertEquals(messages, Lists.newArrayList()); |
| Assert.assertEquals(t.isBegun(), false); |
| Assert.assertEquals(Iterables.size(t.getChildren()), 1); |
| |
| ec.submit(t); |
| Assert.assertEquals(t.isSubmitted(), true); |
| Assert.assertEquals(t.getUnchecked(Duration.ONE_SECOND), "bye"); |
| long elapsed = t.getEndTimeUtc() - t.getSubmitTimeUtc(); |
| Assert.assertTrue(elapsed < 1000, "elapsed time should have been less than 1s but was "+ |
| Time.makeTimeString(elapsed, true)); |
| Assert.assertEquals(Iterables.size(t.getChildren()), 2); |
| Assert.assertEquals(messages.size(), 3, "expected 3 entries, but had "+messages); |
| // either main or hello can be first, but world should be last |
| Assert.assertEquals(messages.get(2), "world"); |
| } |
| |
| public Callable<String> sayCallable(final String message, final Duration duration, final String message2) { |
| return new Callable<String>() { |
| public String call() { |
| try { |
| if (message != null) { |
| log.info("saying: "+message+ " - "+Tasks.current()); |
| synchronized (messages) { |
| messages.add(message); |
| messages.notifyAll(); |
| } |
| } |
| if (message2 != null) { |
| log.info("will say "+message2+" after "+duration); |
| } |
| if (duration != null && duration.toMilliseconds() > 0) { |
| Thread.sleep(duration.toMillisecondsRoundingUp()); |
| } |
| } catch (InterruptedException e) { |
| log.info("releasing semaphore on interruption after saying "+message); |
| cancellations.release(); |
| throw Exceptions.propagate(e); |
| } |
| if (message2 != null) { |
| log.info("saying: "+message2+ " - "+Tasks.current()); |
| synchronized (messages) { |
| messages.add(message2); |
| messages.notifyAll(); |
| } |
| } |
| return message; |
| } |
| }; |
| } |
| |
| public Task<String> sayTask(String message) { |
| return sayTask(message, null, null); |
| } |
| |
| public Task<String> sayTask(String message, Duration duration, String message2) { |
| return Tasks.<String>builder().displayName("say:"+message+(duration!=null ? ":wait("+duration+")" : "")+(message2!=null ? ":"+message2 : "")) |
| .body(sayCallable(message, duration, message2)).build(); |
| } |
| |
| public <T> Task<T> submitting(final Task<T> task) { |
| return Tasks.<T>builder().displayName("submitting:"+task.getId()).body(new Callable<T>() { |
| @Override |
| public T call() throws Exception { |
| ec.submit(task); |
| return task.get(); |
| } |
| }).build(); |
| } |
| |
| @Test |
| public void testComplex() throws Exception { |
| Task<List<?>> t = Tasks.sequential( |
| sayTask("1"), |
| sayTask("2"), |
| Tasks.parallel(sayTask("4"), sayTask("3")), |
| sayTask("5") |
| ); |
| ec.submit(t); |
| Assert.assertEquals(t.get().size(), 4); |
| Asserts.assertEqualsIgnoringOrder((List<?>)t.get().get(2), ImmutableSet.of("3", "4")); |
| Assert.assertTrue(messages.equals(Arrays.asList("1", "2", "3", "4", "5")) || messages.equals(Arrays.asList("1", "2", "4", "3", "5")), "messages="+messages); |
| } |
| |
| @Test |
| public void testCancelled() throws Exception { |
| Task<List<?>> t = Tasks.sequential( |
| sayTask("1"), |
| sayTask("2a", Duration.THIRTY_SECONDS, "2b"), |
| sayTask("3")); |
| ec.submit(t); |
| |
| // wait for 2 to start, saying "2a", and the first interruptible block is when it waits for its 30s |
| waitForMessages(Predicates.compose(MathPredicates.greaterThanOrEqual(2), CollectionFunctionals.sizeFunction()), TIMEOUT); |
| Assert.assertEquals(messages, Arrays.asList("1", "2a")); |
| |
| // now cancel, and make sure we get the right behaviour |
| t.cancel(true); |
| Assert.assertTrue(t.isDone()); |
| // 2 should get cancelled, and invoke the cancellation semaphore, but not say 2b |
| // 3 should get cancelled and not run at all |
| |
| // cancel(..) currently cancels everything in the tree in the calling thread |
| // so we could even assert task3.isCancelled() now |
| // but not sure we will guarantee that for subtasks, so weaker assertion |
| // that it is eventually cancelled, and that it for sure never starts |
| |
| // message list is still 1, 2a |
| Assert.assertEquals(messages, Arrays.asList("1", "2a")); |
| // And 2 when cancelled should release the semaphore |
| log.info("testCancelled waiting on semaphore; permits left is "+cancellations.availablePermits()); |
| Assert.assertTrue(cancellations.tryAcquire(10, TimeUnit.SECONDS)); |
| log.info("testCancelled acquired semaphore; permits left is "+cancellations.availablePermits()); |
| |
| Iterator<Task<?>> ci = ((HasTaskChildren)t).getChildren().iterator(); |
| // 1 completed fine |
| Assert.assertEquals(ci.next().get(), "1"); |
| // 2 is cancelled -- cancelled flag should always be set *before* the interrupt sent |
| // (and that released the semaphore above, even if thread is not completed, so this should be set) |
| Task<?> task2 = ci.next(); |
| Assert.assertTrue(task2.isBegun()); |
| Assert.assertTrue(task2.isDone()); |
| Assert.assertTrue(task2.isCancelled()); |
| |
| Task<?> task3 = ci.next(); |
| // 3 is being cancelled in the thread which cancelled 2, and should either be |
| // *before* 2 was cancelled or *not run* because the parent was cancelled |
| // so we shouldn't need to wait ... but if we did: |
| // Asserts.eventually(Suppliers.ofInstance(task3), TaskPredicates.isDone()); |
| Assert.assertTrue(task3.isDone()); |
| Assert.assertTrue(task3.isCancelled()); |
| Assert.assertFalse(task3.isBegun()); |
| // messages unchanged |
| Assert.assertEquals(messages, Arrays.asList("1", "2a")); |
| // no further mutexes should be available (ie 3 should not run) |
| // TODO for some reason this was observed to fail on the jenkins box (2016-01) |
| // but i can't see why; have added logging in case it happens again |
| Assert.assertEquals(cancellations.availablePermits(), 0); |
| } |
| |
| @Test |
| public void testCancellationModeAndSubmitted() throws Exception { |
| doTestCancellationModeAndSubmitted(true, TaskCancellationMode.DO_NOT_INTERRUPT, false, false); |
| |
| doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_AND_ALL_SUBMITTED_TASKS, true, true); |
| doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS, true, true); |
| doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS, true, false); |
| |
| // if it's not transient, it should only be cancelled on "all submitted" |
| doTestCancellationModeAndSubmitted(false, TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS, true, false); |
| doTestCancellationModeAndSubmitted(false, TaskCancellationMode.INTERRUPT_TASK_AND_ALL_SUBMITTED_TASKS, true, true); |
| |
| // cancellation mode left off should be the same as TASK_AND_DEPENDENT, i.e. don't cancel non-transient bg submitted |
| doTestCancellationModeAndSubmitted(true, null, true, true); |
| doTestCancellationModeAndSubmitted(false, null, true, false); |
| // and 'true' should be the same |
| doTestCancellationModeAndSubmitted(true, true, true, true); |
| doTestCancellationModeAndSubmitted(false, true, true, false); |
| |
| // cancellation mode false should be the same as DO_NOT_INTERRUPT |
| doTestCancellationModeAndSubmitted(true, false, false, false); |
| } |
| |
| public void doTestCancellationModeAndSubmitted( |
| boolean isSubtaskTransient, |
| Object cancellationMode, |
| boolean expectedTaskInterrupted, |
| boolean expectedSubtaskCancelled |
| ) throws Exception { |
| tearDown(); setUp(); |
| |
| final Task<String> t1 = sayTask("1-wait", Duration.minutes(10), "1-done"); |
| if (isSubtaskTransient) { |
| BrooklynTaskTags.addTagDynamically(t1, BrooklynTaskTags.TRANSIENT_TASK_TAG); |
| } |
| |
| final Task<List<?>> t = Tasks.parallel( |
| submitting(t1), |
| sayTask("2-wait", Duration.minutes(10), "2-done")); |
| ec.submit(t); |
| |
| waitForMessages(Predicates.compose(MathPredicates.greaterThanOrEqual(2), CollectionFunctionals.sizeFunction()), TIMEOUT); |
| Asserts.assertEquals(MutableSet.copyOf(messages), MutableSet.of("1-wait", "2-wait")); |
| |
| if (cancellationMode==null) { |
| ((TaskInternal<?>)t).cancel(); |
| } else if (cancellationMode instanceof Boolean) { |
| t.cancel((Boolean)cancellationMode); |
| } else if (cancellationMode instanceof TaskCancellationMode) { |
| ((TaskInternal<?>)t).cancel((TaskCancellationMode)cancellationMode); |
| } else { |
| throw new IllegalStateException("Invalid cancellationMode: "+cancellationMode); |
| } |
| |
| // the cancelled task always reports cancelled and done |
| Assert.assertEquals(t.isDone(), true); |
| Assert.assertEquals(t.isCancelled(), true); |
| // end time might not be set for another fraction of a second |
| if (expectedTaskInterrupted) { |
| Asserts.eventually(new Supplier<Number>() { |
| @Override public Number get() { return t.getEndTimeUtc(); }}, |
| MathPredicates.<Number>greaterThanOrEqual(0)); |
| } else { |
| Assert.assertTrue(t.getEndTimeUtc() < 0, "Wrong end time: "+t.getEndTimeUtc()); |
| } |
| |
| if (expectedSubtaskCancelled) { |
| Asserts.eventually(Suppliers.ofInstance(t1), TaskPredicates.isDone()); |
| Assert.assertTrue(t1.isCancelled()); |
| Asserts.eventually(new Supplier<Number>() { |
| @Override public Number get() { return t1.getEndTimeUtc(); }}, |
| MathPredicates.<Number>greaterThanOrEqual(0)); |
| } else { |
| Time.sleep(Duration.millis(5)); |
| Assert.assertFalse(t1.isCancelled()); |
| Assert.assertFalse(t1.isDone()); |
| } |
| } |
| |
| protected void waitForMessages(Predicate<? super List<String>> predicate, Duration timeout) throws Exception { |
| long endtime = System.currentTimeMillis() + timeout.toMilliseconds(); |
| synchronized (messages) { |
| while (true) { |
| if (predicate.apply(messages)) { |
| return; |
| } |
| long waittime = endtime - System.currentTimeMillis(); |
| if (waittime > 0) { |
| messages.wait(waittime); |
| } else { |
| throw new TimeoutException("Timeout after "+timeout+"; messages="+messages+"; predicate="+predicate); |
| } |
| } |
| } |
| } |
| |
| protected Task<String> monitorableTask(final String id) { |
| return monitorableTask(null, id, null); |
| } |
| protected Task<String> monitorableTask(final Runnable pre, final String id, final Callable<String> post) { |
| Task<String> t = Tasks.<String>builder().body(monitorableJob(pre, id, post)).build(); |
| monitorableTasksMap.put(id, t); |
| return t; |
| } |
| protected Callable<String> monitorableJob(final String id) { |
| return monitorableJob(null, id, null); |
| } |
| protected Callable<String> monitorableJob(final Runnable pre, final String id, final Callable<String> post) { |
| monitorableJobSemaphoreMap.put(id, new Semaphore(0)); |
| return new Callable<String>() { |
| @Override |
| public String call() throws Exception { |
| if (pre!=null) pre.run(); |
| // wait for semaphore |
| if (!monitorableJobSemaphoreMap.get(id).tryAcquire(1, TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)) |
| throw new IllegalStateException("timeout for "+id); |
| synchronized (messages) { |
| messages.add(id); |
| messages.notifyAll(); |
| } |
| if (post!=null) return post.call(); |
| return id; |
| } |
| }; |
| } |
| protected void releaseMonitorableJob(final String id) { |
| monitorableJobSemaphoreMap.get(id).release(); |
| } |
| protected void waitForMessage(final String id) { |
| CountdownTimer timer = CountdownTimer.newInstanceStarted(TIMEOUT); |
| synchronized (messages) { |
| while (!timer.isExpired()) { |
| if (messages.contains(id)) return; |
| timer.waitOnForExpiryUnchecked(messages); |
| } |
| } |
| Assert.fail("Did not see message "+id); |
| } |
| protected void releaseAndWaitForMonitorableJob(final String id) { |
| releaseMonitorableJob(id); |
| waitForMessage(id); |
| } |
| |
| @Test |
| public void testChildrenRunConcurrentlyWithPrimary() { |
| Task<String> t = Tasks.<String>builder().dynamic(true) |
| .body(monitorableJob("main")) |
| .add(monitorableTask("1")).add(monitorableTask("2")).build(); |
| ec.submit(t); |
| releaseAndWaitForMonitorableJob("1"); |
| releaseAndWaitForMonitorableJob("main"); |
| Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); |
| releaseMonitorableJob("2"); |
| |
| Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); |
| Assert.assertEquals(messages, MutableList.of("1", "main", "2")); |
| Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); |
| Assert.assertFalse(t.isError()); |
| } |
| |
| protected static class FailRunnable implements Runnable { |
| @Override public void run() { throw new RuntimeException("Planned exception for test"); } |
| } |
| protected static class FailCallable implements Callable<String> { |
| @Override public String call() { throw new RuntimeException("Planned exception for test"); } |
| } |
| |
| @Test |
| public void testByDefaultChildrenFailureAbortsSecondaryFailsPrimaryButNotAbortsPrimary() { |
| Task<String> t1 = monitorableTask(null, "1", new FailCallable()); |
| Task<String> t = Tasks.<String>builder().dynamic(true) |
| .body(monitorableJob("main")) |
| .add(t1).add(monitorableTask("2")).build(); |
| ec.submit(t); |
| releaseAndWaitForMonitorableJob("1"); |
| Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); |
| releaseMonitorableJob("main"); |
| |
| Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); |
| Assert.assertEquals(messages, MutableList.of("1", "main")); |
| Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); |
| Assert.assertTrue(t.isError()); |
| Assert.assertTrue(t1.isError()); |
| } |
| |
| @Test |
| public void testWhenSwallowingChildrenFailureDoesNotAbortSecondaryOrFailPrimary() { |
| Task<String> t1 = monitorableTask(null, "1", new FailCallable()); |
| Task<String> t = Tasks.<String>builder().dynamic(true) |
| .body(monitorableJob("main")) |
| .add(t1).add(monitorableTask("2")).swallowChildrenFailures(true).build(); |
| ec.submit(t); |
| releaseAndWaitForMonitorableJob("1"); |
| Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); |
| releaseAndWaitForMonitorableJob("2"); |
| Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); |
| releaseMonitorableJob("main"); |
| Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); |
| Assert.assertEquals(messages, MutableList.of("1", "2", "main")); |
| Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); |
| Assert.assertFalse(t.isError()); |
| Assert.assertTrue(t1.isError()); |
| } |
| |
| @Test |
| public void testInessentialChildrenFailureDoesNotAbortSecondaryOrFailPrimary() { |
| Task<String> t1 = monitorableTask(null, "1", new FailCallable()); |
| TaskTags.markInessential(t1); |
| Task<String> t = Tasks.<String>builder().dynamic(true) |
| .body(monitorableJob("main")) |
| .add(t1).add(monitorableTask("2")).build(); |
| ec.submit(t); |
| releaseAndWaitForMonitorableJob("1"); |
| Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); |
| releaseAndWaitForMonitorableJob("2"); |
| Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); |
| releaseMonitorableJob("main"); |
| Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); |
| Assert.assertEquals(messages, MutableList.of("1", "2", "main")); |
| Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); |
| Assert.assertFalse(t.isError()); |
| Assert.assertTrue(t1.isError()); |
| } |
| |
| @Test |
| public void testTaskBuilderUsingAddVarargChildren() { |
| Task<String> t = Tasks.<String>builder().dynamic(true) |
| .body(monitorableJob("main")) |
| .add(monitorableTask("1"), monitorableTask("2")) |
| .build(); |
| ec.submit(t); |
| releaseAndWaitForMonitorableJob("1"); |
| releaseAndWaitForMonitorableJob("2"); |
| releaseAndWaitForMonitorableJob("main"); |
| |
| Assert.assertEquals(messages, MutableList.of("1", "2", "main")); |
| } |
| |
| @Test |
| public void testTaskBuilderUsingAddAllChildren() { |
| Task<String> t = Tasks.<String>builder().dynamic(true) |
| .body(monitorableJob("main")) |
| .addAll(ImmutableList.of(monitorableTask("1"), monitorableTask("2"))) |
| .build(); |
| ec.submit(t); |
| releaseAndWaitForMonitorableJob("1"); |
| releaseAndWaitForMonitorableJob("2"); |
| releaseAndWaitForMonitorableJob("main"); |
| |
| Assert.assertEquals(messages, MutableList.of("1", "2", "main")); |
| } |
| } |