This closes #1160
diff --git a/brooklyn-docs/guide/misc/release-notes.md b/brooklyn-docs/guide/misc/release-notes.md
index f65e77d..3661ab1 100644
--- a/brooklyn-docs/guide/misc/release-notes.md
+++ b/brooklyn-docs/guide/misc/release-notes.md
@@ -51,3 +51,8 @@
For changes in prior versions, please refer to the release notes for
[0.8.0](/v/0.8.0-incubating/misc/release-notes.html).
+
+3. Task cancellation is now propagated to dependent submitted tasks, including backgrounded tasks if they are transient.
+Previously when a task was cancelled the API did not guarantee semantics but the behaviour was to cancel sub-tasks only
+in very limited cases. Now the semantics are more precise and controllable, and more sub-tasks are cancelled.
+This can prevent some leaked waits on `attributeWhenReady`.
diff --git a/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java b/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
index c8f1c00..42147c5 100644
--- a/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
+++ b/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
@@ -81,6 +81,24 @@
public boolean isError();
/**
+ * As {@link Future#isDone()}. In particular if cancelled, this will return true
+ * as soon as it is cancelled. The thread for this task may still be running,
+ * if the cancellation (often an interruption, but may be weaker) has not applied,
+ * and submitted threads may also be running depending on cancellation parameters.
+ * <p>
+ * {@link #get()} is guaranteed to return immediately, throwing in the case of cancellation
+ * prior to completion (and including the case above where a thread may still be running).
+ * <p>
+ * To check whether cancelled threads for this task have completed,
+ * inspect {@link #getEndTimeUtc()}, which is guaranteed to be set when threads complete
+ * if the thread is started (as determinable by whether {@link #getStartTimeUtc()} is set).
+ * (The threads of submitted/child tasks will usually be independent; to determine their
+ * completion requires inspecting the {@link ExecutionManager}.)
+ */
+ @Override
+ public boolean isDone();
+
+ /**
* Causes calling thread to block until the task is started.
*/
public void blockUntilStarted();
diff --git a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
index a417e32..48a0283 100644
--- a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
+++ b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
@@ -115,7 +115,7 @@
try {
if (log.isDebugEnabled())
- log.debug("Queuing task to resolve "+dsl);
+ log.debug("Queuing task to resolve "+dsl+", called by "+Tasks.current());
EntityInternal entity = (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
ExecutionContext exec =
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
index 8914ca4..eb4ff10 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
@@ -307,12 +307,16 @@
/* Marked transient so that the task is not needlessly kept around at the highest level.
* Note that the task is not normally visible in the GUI, because
- * (a) while it is running, the entity is parentless (and so not in the tree);
+ * (a) while it is running, the entity is often parentless (and so not in the tree);
* and (b) when it is completed it is GC'd, as it is transient.
* However task info is available via the API if you know its ID,
* and if better subtask querying is available it will be picked up as a background task
* of the parent entity creating this child entity
* (note however such subtasks are currently filtered based on parent entity so is excluded).
+ * <p>
+ * Some of these (initializers and enrichers) submit background scheduled tasks,
+ * which currently show up at the top level once the initializer task completes.
+ * TODO It would be nice if these schedule tasks were grouped in a bucket!
*/
((EntityInternal)entity).getExecutionContext().submit(Tasks.builder().dynamic(false).displayName("Entity initialization")
.tag(BrooklynTaskTags.tagForContextEntity(entity))
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
index ac4bef5..0c622c3 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
@@ -248,7 +248,7 @@
// return immediately if either the ready predicate or the abort conditions hold
if (ready(value)) return postProcess(value);
-
+
final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
long start = System.currentTimeMillis();
@@ -790,6 +790,7 @@
.displayName("waiting on "+sensor.getName())
.description("Waiting on sensor "+sensor.getName()+" from "+source)
.tag("attributeWhenReady")
+ .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
.body(new WaitInTaskForAttributeReady<T,V>(this))
.build();
}
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 13dda46..74e0ddd 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -175,18 +175,18 @@
}
final Object startCallback = properties.get("newTaskStartCallback");
- properties.put("newTaskStartCallback", new Function<Object,Void>() {
- public Void apply(Object it) {
+ properties.put("newTaskStartCallback", new Function<Task<?>,Void>() {
+ public Void apply(Task<?> it) {
registerPerThreadExecutionContext();
- if (startCallback!=null) ExecutionUtils.invoke(startCallback, it);
+ if (startCallback!=null) BasicExecutionManager.invokeCallback(startCallback, it);
return null;
}});
final Object endCallback = properties.get("newTaskEndCallback");
- properties.put("newTaskEndCallback", new Function<Object,Void>() {
- public Void apply(Object it) {
+ properties.put("newTaskEndCallback", new Function<Task<?>,Void>() {
+ public Void apply(Task<?> it) {
try {
- if (endCallback!=null) ExecutionUtils.invoke(endCallback, it);
+ if (endCallback!=null) BasicExecutionManager.invokeCallback(endCallback, it);
} finally {
clearPerThreadExecutionContext();
}
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index d90b1a1..0aab7d5 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -19,6 +19,7 @@
package org.apache.brooklyn.util.core.task;
import static com.google.common.base.Preconditions.checkNotNull;
+import groovy.lang.Closure;
import java.util.Collection;
import java.util.Collections;
@@ -51,16 +52,22 @@
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.core.BrooklynFeatureEnablement;
import org.apache.brooklyn.core.config.Sanitizer;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.text.Identifiers;
+import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ExecutionList;
@@ -368,7 +375,6 @@
return submitSubsequentScheduledTask(flags, task);
}
- @SuppressWarnings("unchecked")
protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
if (!task.isDone()) {
task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags),
@@ -508,9 +514,15 @@
*/
if (log.isDebugEnabled()) {
// debug only here, because most submitters will handle failures
- log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error);
- if (log.isTraceEnabled())
- log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error);
+ if (error instanceof InterruptedException || error instanceof RuntimeInterruptedException) {
+ log.debug("Detected interruption on task "+task+" (rethrowing)" +
+ (Strings.isNonBlank(error.getMessage()) ? ": "+error.getMessage() : ""));
+ } else {
+ log.debug("Exception running task "+task+" (rethrowing): "+error);
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Trace for exception running task "+task+" (rethrowing): "+error, error);
+ }
}
throw Exceptions.propagate(error);
}
@@ -526,19 +538,64 @@
}
}
- private final static class ListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
+ @SuppressWarnings("deprecation")
+ // TODO do we even need a listenable future here? possibly if someone wants to interrogate the future it might
+ // be interesting, so possibly it is useful that we implement ListenableFuture...
+ private final static class CancellingListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
private final Task<T> task;
+ private BasicExecutionManager execMgmt;
- private ListenableForwardingFutureForTask(Future<T> delegate, ExecutionList list, Task<T> task) {
+ private CancellingListenableForwardingFutureForTask(BasicExecutionManager execMgmt, Future<T> delegate, ExecutionList list, Task<T> task) {
super(delegate, list);
+ this.execMgmt = execMgmt;
this.task = task;
}
@Override
- public boolean cancel(boolean mayInterruptIfRunning) {
+ public boolean cancel(TaskCancellationMode mode) {
boolean result = false;
- if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning);
- result |= super.cancel(mayInterruptIfRunning);
+ if (log.isTraceEnabled()) {
+ log.trace("CLFFT cancelling "+task+" mode "+mode);
+ }
+ if (!task.isCancelled()) result |= ((TaskInternal<T>)task).cancel(mode);
+ result |= delegate().cancel(mode.isAllowedToInterruptTask());
+
+ if (mode.isAllowedToInterruptAllSubmittedTasks() || mode.isAllowedToInterruptDependentSubmittedTasks()) {
+ int subtasksFound=0;
+ int subtasksReallyCancelled=0;
+
+ if (task instanceof HasTaskChildren) {
+ for (Task<?> child: ((HasTaskChildren)task).getChildren()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Cancelling "+child+" on recursive cancellation of "+task);
+ }
+ subtasksFound++;
+ if (((TaskInternal<?>)child).cancel(mode)) {
+ result = true;
+ subtasksReallyCancelled++;
+ }
+ }
+ }
+ // TODO this is inefficient; might want to keep an index on submitted-by
+ for (Task<?> t: execMgmt.getAllTasks()) {
+ if (task.equals(t.getSubmittedByTask())) {
+ if (mode.isAllowedToInterruptAllSubmittedTasks() || BrooklynTaskTags.isTransient(t)) {
+ if (log.isTraceEnabled()) {
+ log.trace("Cancelling "+t+" on recursive cancellation of "+task);
+ }
+ subtasksFound++;
+ if (((TaskInternal<?>)t).cancel(mode)) {
+ result = true;
+ subtasksReallyCancelled++;
+ }
+ }
+ }
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("On cancel of "+task+", applicable subtask count "+subtasksFound+", of which "+subtasksReallyCancelled+" were actively cancelled");
+ }
+ }
+
((TaskInternal<?>)task).runListeners();
return result;
}
@@ -571,9 +628,15 @@
@SuppressWarnings("unchecked")
protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) {
- if (log.isTraceEnabled()) log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}",
+ if (log.isTraceEnabled()) {
+ log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}; caller {}",
new Object[] {task.getId(), task, Sanitizer.sanitize(flags), task.getTags(),
- (task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>")});
+ (task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>"),
+ Tasks.current() });
+ if (Tasks.current()==null && BrooklynTaskTags.isTransient(task)) {
+ log.trace("Stack trace for unparented submission of transient "+task, new Throwable("trace only (not an error)"));
+ }
+ }
if (task instanceof ScheduledTask)
return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task);
@@ -604,15 +667,16 @@
} else {
future = runner.submit(job);
}
- // on completion, listeners get triggered above; here, below we ensure they get triggered on cancel
- // (and we make sure the same ExecutionList is used in the future as in the task)
- ListenableFuture<T> listenableFuture = new ListenableForwardingFutureForTask<T>(future, ((TaskInternal<T>)task).getListeners(), task);
- // doesn't matter whether the listener is added to the listenableFuture or the task,
- // except that for the task we can more easily wrap it so that it only logs debug if the executor is shutdown
- // (avoid a bunch of ugly warnings in tests which start and stop things a lot!)
- // [probably even nicer to run this in the same thread, it doesn't do much; but that is messier to implement]
+ // SubmissionCallable (above) invokes the listeners on completion;
+ // this future allows a caller to add custom listeners
+ // (it does not notify the listeners; that's our job);
+ // except on cancel we want to listen
+ ListenableFuture<T> listenableFuture = new CancellingListenableForwardingFutureForTask<T>(this, future, ((TaskInternal<T>)task).getListeners(), task);
+ // and we want to make sure *our* (manager) listeners are given suitable callback
((TaskInternal<T>)task).addListener(new SubmissionListenerToCallOtherListeners<T>(task), runner);
+ // NB: can the above mean multiple callbacks to TaskInternal#runListeners?
+ // finally expose the future to callers
((TaskInternal<T>)task).initInternalFuture(listenableFuture);
return task;
@@ -665,9 +729,27 @@
PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
}
- ExecutionUtils.invoke(flags.get("newTaskStartCallback"), task);
+ invokeCallback(flags.get("newTaskStartCallback"), task);
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ // not ideal, such loose typing on the callback -- should prefer Function<Task,Object>
+ // but at least it's package-private
+ static Object invokeCallback(Object callable, Task<?> task) {
+ if (callable instanceof Closure) return ((Closure<?>)callable).call(task);
+ if (callable instanceof Callable) {
+ try {
+ return ((Callable<?>)callable).call();
+ } catch (Throwable t) {
+ throw Exceptions.propagate(t);
+ }
+ }
+ if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; }
+ if (callable instanceof Function) { return ((Function)callable).apply(task); }
+ if (callable==null) return null;
+ throw new IllegalArgumentException("Cannot invoke unexpected callback object "+callable+" of type "+callable.getClass()+" on "+task);
+ }
+
/** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
internalAfterEnd(flags, task, false, true);
@@ -693,7 +775,7 @@
}
if (isEndingAllIterations) {
incompleteTaskIds.remove(task.getId());
- ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task);
+ invokeCallback(flags.get("newTaskEndCallback"), task);
((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
}
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
index 0c26dd1..7c29bba 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
@@ -155,7 +155,7 @@
(Strings.isNonEmpty(displayName) ?
displayName :
(job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) +
- ":"+getId()+"]";
+ "]@"+getId();
}
@Override
@@ -196,7 +196,7 @@
protected Maybe<Task<?>> submittedByTask;
protected volatile Thread thread = null;
- private volatile boolean cancelled = false;
+ protected volatile boolean cancelled = false;
/** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */
protected volatile Future<T> internalFuture = null;
@@ -288,15 +288,34 @@
}
@Override
- public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+ public final synchronized boolean cancel(boolean mayInterruptIfRunning) {
+ // semantics changed in 2016-01, previously "true" was INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS
+ return cancel(mayInterruptIfRunning ? TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS
+ : TaskCancellationMode.DO_NOT_INTERRUPT);
+ }
+
+ @Override @Beta
+ public synchronized boolean cancel(TaskCancellationMode mode) {
if (isDone()) return false;
- boolean cancel = true;
- cancelled = true;
- if (internalFuture!=null) {
- cancel = internalFuture.cancel(mayInterruptIfRunning);
+ if (log.isTraceEnabled()) {
+ log.trace("BT cancelling "+this+" mode "+mode);
}
+ cancelled = true;
+ doCancel(mode);
notifyAll();
- return cancel;
+ return true;
+ }
+
+ @SuppressWarnings("deprecation")
+ protected boolean doCancel(TaskCancellationMode mode) {
+ if (internalFuture!=null) {
+ if (internalFuture instanceof ListenableForwardingFuture) {
+ return ((ListenableForwardingFuture<?>)internalFuture).cancel(mode);
+ } else {
+ return internalFuture.cancel(mode.isAllowedToInterruptTask());
+ }
+ }
+ return true;
}
@Override
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
index b7985c8..51a4e34 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
@@ -158,27 +158,44 @@
}
@Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return cancel(mayInterruptIfRunning, mayInterruptIfRunning, true);
- }
- public boolean cancel(boolean mayInterruptTask, boolean interruptPrimaryThread, boolean alsoCancelChildren) {
- if (isDone()) return false;
- if (log.isTraceEnabled()) log.trace("cancelling {}", this);
- boolean cancel = super.cancel(mayInterruptTask);
- if (alsoCancelChildren) {
+ protected boolean doCancel(TaskCancellationMode mode) {
+ boolean result = false;
+ if (mode.isAllowedToInterruptDependentSubmittedTasks() || mode.isAllowedToInterruptAllSubmittedTasks()) {
for (Task<?> t: secondaryJobsAll)
- cancel |= t.cancel(mayInterruptTask);
+ result = ((TaskInternal<?>)t).cancel(mode) || result;
}
+ return super.doCancel(mode) || result;
+ // returns true if anything is successfully cancelled
+ }
+
+ public boolean cancel(TaskCancellationMode mode) {
+ return cancel(mode, null);
+ }
+
+ protected boolean cancel(TaskCancellationMode mode, Boolean interruptPrimaryThreadOverride) {
+ if (isDone()) return false;
+ if (log.isTraceEnabled()) log.trace("cancelling DST {}", this);
+
+ // first do the super's cancel, setting cancelled, and calling doCancel to cancel children
+ boolean result = super.cancel(mode);
+ // then come back and ensure our primary thread is cancelled if needed
+
+ if (interruptPrimaryThreadOverride==null) interruptPrimaryThreadOverride = mode.isAllowedToInterruptTask();
+ if (log.isTraceEnabled()) {
+ log.trace("DST cancelling "+this+" mode "+mode+", interruptPrimary "+interruptPrimaryThreadOverride);
+ }
+
synchronized (jobTransitionLock) {
if (primaryThread!=null) {
- if (interruptPrimaryThread) {
+ if (interruptPrimaryThreadOverride) {
if (log.isTraceEnabled()) log.trace("cancelling {} - interrupting", this);
primaryThread.interrupt();
}
- cancel = true;
+ result = true;
}
}
- return cancel;
+
+ return result;
}
@Override
@@ -309,7 +326,7 @@
}
if (!primaryFinished && failureHandlingConfig.cancelPrimaryOnSecondaryFailure) {
- cancel(true, false, false);
+ cancel(TaskCancellationMode.INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS, false);
}
result.add(Tasks.getError(secondaryJob));
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ExecutionUtils.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ExecutionUtils.java
deleted file mode 100644
index 72a5ae4..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ExecutionUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.util.core.task;
-
-import groovy.lang.Closure;
-
-import java.util.concurrent.Callable;
-
-import com.google.common.base.Function;
-import com.google.common.base.Throwables;
-
-public class ExecutionUtils {
- /**
- * Attempts to run/call the given object, with the given arguments if possible, preserving the return value if there is one (null otherwise);
- * throws exception if the callable is a non-null object which cannot be invoked (not a callable or runnable)
- * @deprecated since 0.7.0 ; this super-loose typing should be avoided; if it is needed, let's move it to one of the Groovy compatibility classes
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static Object invoke(Object callable, Object ...args) {
- if (callable instanceof Closure) return ((Closure<?>)callable).call(args);
- if (callable instanceof Callable) {
- try {
- return ((Callable<?>)callable).call();
- } catch (Throwable t) {
- throw Throwables.propagate(t);
- }
- }
- if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; }
- if (callable instanceof Function && args.length == 1) { return ((Function)callable).apply(args[0]); }
- if (callable==null) return null;
- throw new IllegalArgumentException("Cannot invoke unexpected object "+callable+" of type "+callable.getClass()+", with "+args.length+" args");
- }
-}
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
index 4ce56d1..cbc474c 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
@@ -21,15 +21,26 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ForwardingFuture.SimpleForwardingFuture;
import com.google.common.util.concurrent.ListenableFuture;
-/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the resposibility to:
+/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the responsibility to:
* <li> invoke the listeners on job completion (success or error)
- * <li> invoke the listeners on cancel */
+ * <li> invoke the listeners on cancel
+ *
+ * @deprecated since 0.9.0 likely to leave the public API */
+@Deprecated // TODO just one subclass, it can hold the behaviour we need from this,
+// and the methods here are surprising as they expect the caller to notify the list
public abstract class ListenableForwardingFuture<T> extends SimpleForwardingFuture<T> implements ListenableFuture<T> {
+ private static final Logger log = LoggerFactory.getLogger(ListenableForwardingFuture.class);
+
+ // TODO these are never accessed or used
final ExecutionList listeners;
protected ListenableForwardingFuture(Future<T> delegate) {
@@ -42,9 +53,22 @@
this.listeners = list;
}
+ private static boolean warned = false;
+
@Override
public void addListener(Runnable listener, Executor executor) {
+ if (!warned) {
+ log.warn("Use of deprecated ListenableForwardingFuture.addListener at "+this+" (future calls will not be logged)", new Throwable("stack trace"));
+ warned = true;
+ }
+
listeners.add(listener, executor);
}
+ public abstract boolean cancel(TaskCancellationMode mode);
+
+ public final boolean cancel(boolean mayInterrupt) {
+ return cancel(TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS);
+ }
+
}
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
index c1ad4f8..219f4f8 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
@@ -43,7 +43,7 @@
*/
// TODO ScheduledTask is a very pragmatic implementation; would be nice to tighten,
// reduce external assumptions about internal structure, and clarify "done" semantics
-public class ScheduledTask extends BasicTask {
+public class ScheduledTask extends BasicTask<Object> {
final Callable<Task<?>> taskFactory;
@@ -84,7 +84,7 @@
this(MutableMap.of(), task);
}
- public ScheduledTask(Map flags, final Task<?> task){
+ public ScheduledTask(Map<?,?> flags, final Task<?> task){
this(flags, new Callable<Task<?>>(){
@Override
public Task<?> call() throws Exception {
@@ -92,7 +92,7 @@
}});
}
- public ScheduledTask(Map flags, Callable<Task<?>> taskFactory) {
+ public ScheduledTask(Map<?,?> flags, Callable<Task<?>> taskFactory) {
super(flags);
this.taskFactory = taskFactory;
@@ -194,13 +194,11 @@
}
@Override
- public synchronized boolean cancel(boolean mayInterrupt) {
- boolean result = super.cancel(mayInterrupt);
+ protected boolean doCancel(org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode mode) {
if (nextRun!=null) {
- nextRun.cancel(mayInterrupt);
- notifyAll();
+ ((TaskInternal<?>)nextRun).cancel(mode);
}
- return result;
+ return super.doCancel(mode);
}
/**
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
index 2bf0fec..99c2773 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
@@ -29,6 +29,7 @@
import com.google.common.annotations.Beta;
import com.google.common.base.Function;
+import com.google.common.base.Objects;
import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
@@ -95,6 +96,8 @@
Object getExtraStatusText();
+ /** On task completion (or cancellation) runs the listeners which have been registered using
+ * {@link #addListener(Runnable, java.util.concurrent.Executor)}. */
void runListeners();
void setEndTimeUtc(long val);
@@ -120,5 +123,41 @@
/** if a task is a proxy for another one (used mainly for internal tasks),
* this returns the "real" task represented by this one */
Task<?> getProxyTarget();
+
+ /** clearer semantics around cancellation; may be promoted to {@link Task} if we */
+ @Beta
+ public boolean cancel(TaskCancellationMode mode);
+
+ @Beta
+ public static class TaskCancellationMode {
+ public static final TaskCancellationMode DO_NOT_INTERRUPT = new TaskCancellationMode(false, false, false);
+ public static final TaskCancellationMode INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS = new TaskCancellationMode(true, false, false);
+ public static final TaskCancellationMode INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS = new TaskCancellationMode(true, true, false);
+ public static final TaskCancellationMode INTERRUPT_TASK_AND_ALL_SUBMITTED_TASKS = new TaskCancellationMode(true, true, true);
+
+ private final boolean allowedToInterruptTask,
+ allowedToInterruptDependentSubmittedTasks,
+ allowedToInterruptAllSubmittedTasks;
+
+ private TaskCancellationMode(boolean mayInterruptIfRunning, boolean interruptSubmittedTransients, boolean interruptAllSubmitted) {
+ this.allowedToInterruptTask = mayInterruptIfRunning;
+ this.allowedToInterruptDependentSubmittedTasks = interruptSubmittedTransients;
+ this.allowedToInterruptAllSubmittedTasks = interruptAllSubmitted;
+ }
+
+ public boolean isAllowedToInterruptTask() { return allowedToInterruptTask; }
+ /** Implementation-dependent what "dependent" means in this context,
+ * e.g. may be linked to a "transient" tag (that's what Brooklyn does) */
+ public boolean isAllowedToInterruptDependentSubmittedTasks() { return allowedToInterruptDependentSubmittedTasks; }
+ public boolean isAllowedToInterruptAllSubmittedTasks() { return allowedToInterruptAllSubmittedTasks; }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).add("interruptTask", allowedToInterruptTask)
+ .add("interruptDependentSubmitted", allowedToInterruptDependentSubmittedTasks)
+ .add("interruptAllSubmitted", allowedToInterruptAllSubmittedTasks)
+ .toString();
+ }
+ }
}
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java
index 8e46002..d8d3764 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java
@@ -60,4 +60,20 @@
return "displayNameMatches("+matcher+")";
}
}
+
+ public static Predicate<Task<?>> isDone() {
+ return new IsDone();
+ }
+
+ private static class IsDone implements Predicate<Task<?>> {
+ @Override
+ public boolean apply(Task<?> input) {
+ return input.isDone();
+ }
+ @Override
+ public String toString() {
+ return "isDone()";
+ }
+ }
+
}
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java
index a291c53..e8e7890 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java
@@ -70,7 +70,6 @@
if (em != null) em.shutdownNow();
}
- @SuppressWarnings("unchecked")
@Test
public void testScheduledTaskExecutedAfterDelay() throws Exception {
int delay = 100;
@@ -95,7 +94,6 @@
assertTrue(actualDelay < (delay+MAX_OVERHEAD_MS), "actualDelay="+actualDelay+"; delay="+delay);
}
- @SuppressWarnings("unchecked")
@Test
public void testScheduledTaskExecutedAtRegularPeriod() throws Exception {
final int period = 100;
@@ -127,7 +125,6 @@
}
}
- @SuppressWarnings("unchecked")
@Test
public void testCanCancelScheduledTask() throws Exception {
final int period = 1;
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
index ceff29f..364870a 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
@@ -30,10 +30,13 @@
import org.apache.brooklyn.api.mgmt.HasTaskChildren;
import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.collections.CollectionFunctionals;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.math.MathPredicates;
import org.apache.brooklyn.util.time.CountdownTimer;
@@ -49,6 +52,8 @@
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -154,7 +159,17 @@
}
public Task<String> sayTask(String message, Duration duration, String message2) {
- return Tasks.<String>builder().body(sayCallable(message, duration, message2)).build();
+ return Tasks.<String>builder().displayName("say:"+message).body(sayCallable(message, duration, message2)).build();
+ }
+
+ public <T> Task<T> submitting(final Task<T> task) {
+ return Tasks.<T>builder().displayName("submitting:"+task.getId()).body(new Callable<T>() {
+ @Override
+ public T call() throws Exception {
+ ec.submit(task);
+ return task.get();
+ }
+ }).build();
}
@Test
@@ -207,6 +222,85 @@
// but we do _not_ get a mutex from task3 as it does not run (is not interrupted)
Assert.assertEquals(cancellations.availablePermits(), 0);
}
+
+ @Test
+ public void testCancellationModeAndSubmitted() throws Exception {
+ doTestCancellationModeAndSubmitted(true, TaskCancellationMode.DO_NOT_INTERRUPT, false, false);
+
+ doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_AND_ALL_SUBMITTED_TASKS, true, true);
+ doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS, true, true);
+ doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS, true, false);
+
+ // if it's not transient, it should only be cancelled on "all submitted"
+ doTestCancellationModeAndSubmitted(false, TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS, true, false);
+ doTestCancellationModeAndSubmitted(false, TaskCancellationMode.INTERRUPT_TASK_AND_ALL_SUBMITTED_TASKS, true, true);
+
+ // cancellation mode left off should be the same as TASK_AND_DEPENDENT, i.e. don't cancel non-transient bg submitted
+ doTestCancellationModeAndSubmitted(true, null, true, true);
+ doTestCancellationModeAndSubmitted(false, null, true, false);
+ // and 'true' should be the same
+ doTestCancellationModeAndSubmitted(true, true, true, true);
+ doTestCancellationModeAndSubmitted(false, true, true, false);
+
+ // cancellation mode false should be the same as DO_NOT_INTERRUPT
+ doTestCancellationModeAndSubmitted(true, false, false, false);
+ }
+
+ public void doTestCancellationModeAndSubmitted(
+ boolean isSubtaskTransient,
+ Object cancellationMode,
+ boolean expectedTaskInterrupted,
+ boolean expectedSubtaskCancelled
+ ) throws Exception {
+ tearDown(); setUp();
+
+ final Task<String> t1 = sayTask("1-wait", Duration.minutes(10), "1-done");
+ if (isSubtaskTransient) {
+ BrooklynTaskTags.addTagDynamically(t1, BrooklynTaskTags.TRANSIENT_TASK_TAG);
+ }
+
+ final Task<List<?>> t = Tasks.parallel(
+ submitting(t1),
+ sayTask("2-wait", Duration.minutes(10), "2-done"));
+ ec.submit(t);
+
+ waitForMessages(Predicates.compose(MathPredicates.greaterThanOrEqual(2), CollectionFunctionals.sizeFunction()), TIMEOUT);
+ Asserts.assertEquals(MutableSet.copyOf(messages), MutableSet.of("1-wait", "2-wait"));
+
+ if (cancellationMode==null) {
+ ((TaskInternal<?>)t).cancel();
+ } else if (cancellationMode instanceof Boolean) {
+ t.cancel((Boolean)cancellationMode);
+ } else if (cancellationMode instanceof TaskCancellationMode) {
+ ((TaskInternal<?>)t).cancel((TaskCancellationMode)cancellationMode);
+ } else {
+ throw new IllegalStateException("Invalid cancellationMode: "+cancellationMode);
+ }
+
+ // the cancelled task always reports cancelled and done
+ Assert.assertEquals(t.isDone(), true);
+ Assert.assertEquals(t.isCancelled(), true);
+ // end time might not be set for another fraction of a second
+ if (expectedTaskInterrupted) {
+ Asserts.eventually(new Supplier<Number>() {
+ @Override public Number get() { return t.getEndTimeUtc(); }},
+ MathPredicates.<Number>greaterThanOrEqual(0));
+ } else {
+ Assert.assertTrue(t.getEndTimeUtc() < 0, "Wrong end time: "+t.getEndTimeUtc());
+ }
+
+ if (expectedSubtaskCancelled) {
+ Asserts.eventually(Suppliers.ofInstance(t1), TaskPredicates.isDone());
+ Assert.assertTrue(t1.isCancelled());
+ Asserts.eventually(new Supplier<Number>() {
+ @Override public Number get() { return t1.getEndTimeUtc(); }},
+ MathPredicates.<Number>greaterThanOrEqual(0));
+ } else {
+ Time.sleep(Duration.millis(5));
+ Assert.assertFalse(t1.isCancelled());
+ Assert.assertFalse(t1.isDone());
+ }
+ }
protected void waitForMessages(Predicate<? super List<String>> predicate, Duration timeout) throws Exception {
long endtime = System.currentTimeMillis() + timeout.toMilliseconds();
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java
index 1d1c3af..6fd3021 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java
@@ -64,6 +64,11 @@
protected TaskInternal<T> delegate() {
return delegate;
}
+
+ @Override
+ public boolean cancel(TaskCancellationMode mode) {
+ return delegate.cancel(mode);
+ }
}
private BasicExecutionManager em;
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
index 1d551e8..5c11355 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
@@ -45,7 +45,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
-@SuppressWarnings({"unchecked","rawtypes"})
+@SuppressWarnings({"rawtypes"})
public class ScheduledExecutionTest {
public static final Logger log = LoggerFactory.getLogger(ScheduledExecutionTest.class);
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
index fce6f0f..8a25361 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
@@ -57,8 +57,8 @@
.body(Callables.<Object>returning("val"))
.displayName("myname")
.build());
- assertTrue(TaskPredicates.displayNameMatches(Predicates.equalTo("myname")).apply(task));
- assertFalse(TaskPredicates.displayNameMatches(Predicates.equalTo("wrong")).apply(task));
+ assertTrue(TaskPredicates.displayNameSatisfies(Predicates.equalTo("myname")).apply(task));
+ assertFalse(TaskPredicates.displayNameSatisfies(Predicates.equalTo("wrong")).apply(task));
}
@Test
diff --git a/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java b/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
index e483d0b..a6d3b8e 100644
--- a/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
+++ b/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.config.ConfigKey;
@@ -35,6 +36,7 @@
import org.apache.brooklyn.rest.transform.EntityTransformer;
import org.apache.brooklyn.rest.util.WebResourceUtils;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.core.task.ValueResolver;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
@@ -71,14 +73,32 @@
public Map<String, Object> batchConfigRead(String application, String entityToken, Boolean raw) {
// TODO: add test
Entity entity = brooklyn().getEntity(application, entityToken);
- Map<ConfigKey<?>, ?> source = ((EntityInternal) entity).config().getBag().getAllConfigAsConfigKeyMap();
- Map<String, Object> result = Maps.newLinkedHashMap();
- for (Map.Entry<ConfigKey<?>, ?> ek : source.entrySet()) {
- Object value = ek.getValue();
- result.put(ek.getKey().getName(),
- resolving(value).preferJson(true).asJerseyOutermostReturnValue(false).raw(raw).context(entity).timeout(Duration.ZERO).renderAs(ek.getKey()).resolve());
+ // wrap in a task for better runtime view
+ return Entities.submit(entity, Tasks.<Map<String,Object>>builder().displayName("REST API batch config read").body(new BatchConfigRead(this, entity, raw)).build()).getUnchecked();
+ }
+
+ private static class BatchConfigRead implements Callable<Map<String,Object>> {
+ private EntityConfigResource resource;
+ private Entity entity;
+ private Boolean raw;
+
+ public BatchConfigRead(EntityConfigResource resource, Entity entity, Boolean raw) {
+ this.resource = resource;
+ this.entity = entity;
+ this.raw = raw;
}
- return result;
+
+ @Override
+ public Map<String, Object> call() throws Exception {
+ Map<ConfigKey<?>, ?> source = ((EntityInternal) entity).config().getBag().getAllConfigAsConfigKeyMap();
+ Map<String, Object> result = Maps.newLinkedHashMap();
+ for (Map.Entry<ConfigKey<?>, ?> ek : source.entrySet()) {
+ Object value = ek.getValue();
+ result.put(ek.getKey().getName(),
+ resource.resolving(value).preferJson(true).asJerseyOutermostReturnValue(false).raw(raw).context(entity).timeout(Duration.ZERO).renderAs(ek.getKey()).resolve());
+ }
+ return result;
+ }
}
@Override