This closes #1160
diff --git a/brooklyn-docs/guide/misc/release-notes.md b/brooklyn-docs/guide/misc/release-notes.md
index f65e77d..3661ab1 100644
--- a/brooklyn-docs/guide/misc/release-notes.md
+++ b/brooklyn-docs/guide/misc/release-notes.md
@@ -51,3 +51,8 @@
 
 For changes in prior versions, please refer to the release notes for 
 [0.8.0](/v/0.8.0-incubating/misc/release-notes.html).
+
+3. Task cancellation is now propagated to dependent submitted tasks, including backgrounded tasks if they are transient.
+Previously when a task was cancelled the API did not guarantee semantics but the behaviour was to cancel sub-tasks only 
+in very limited cases. Now the semantics are more precise and controllable, and more sub-tasks are cancelled.
+This can prevent some leaked waits on `attributeWhenReady`.
diff --git a/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java b/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
index c8f1c00..42147c5 100644
--- a/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
+++ b/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
@@ -81,6 +81,24 @@
     public boolean isError();
 
     /**
+     * As {@link Future#isDone()}. In particular if cancelled, this will return true
+     * as soon as it is cancelled. The thread for this task may still be running,
+     * if the cancellation (often an interruption, but may be weaker) has not applied,
+     * and submitted threads may also be running depending on cancellation parameters.
+     * <p>
+     * {@link #get()} is guaranteed to return immediately, throwing in the case of cancellation
+     * prior to completion (and including the case above where a thread may still be running).
+     * <p>
+     * To check whether cancelled threads for this task have completed, 
+     * inspect {@link #getEndTimeUtc()}, which is guaranteed to be set when threads complete
+     * if the thread is started (as determinable by whether {@link #getStartTimeUtc()} is set).
+     * (The threads of submitted/child tasks will usually be independent; to determine their
+     * completion requires inspecting the {@link ExecutionManager}.)  
+     */
+    @Override
+    public boolean isDone();
+    
+    /**
      * Causes calling thread to block until the task is started.
      */
     public void blockUntilStarted();
diff --git a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
index a417e32..48a0283 100644
--- a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
+++ b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
@@ -115,7 +115,7 @@
         
         try {
             if (log.isDebugEnabled())
-                log.debug("Queuing task to resolve "+dsl);
+                log.debug("Queuing task to resolve "+dsl+", called by "+Tasks.current());
 
             EntityInternal entity = (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
             ExecutionContext exec =
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
index 8914ca4..eb4ff10 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
@@ -307,12 +307,16 @@
 
         /* Marked transient so that the task is not needlessly kept around at the highest level.
          * Note that the task is not normally visible in the GUI, because 
-         * (a) while it is running, the entity is parentless (and so not in the tree);
+         * (a) while it is running, the entity is often parentless (and so not in the tree);
          * and (b) when it is completed it is GC'd, as it is transient.
          * However task info is available via the API if you know its ID,
          * and if better subtask querying is available it will be picked up as a background task 
          * of the parent entity creating this child entity
          * (note however such subtasks are currently filtered based on parent entity so is excluded).
+         * <p>
+         * Some of these (initializers and enrichers) submit background scheduled tasks,
+         * which currently show up at the top level once the initializer task completes.
+         * TODO It would be nice if these schedule tasks were grouped in a bucket! 
          */
         ((EntityInternal)entity).getExecutionContext().submit(Tasks.builder().dynamic(false).displayName("Entity initialization")
                 .tag(BrooklynTaskTags.tagForContextEntity(entity))
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
index ac4bef5..0c622c3 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
@@ -248,7 +248,7 @@
 
             // return immediately if either the ready predicate or the abort conditions hold
             if (ready(value)) return postProcess(value);
-
+            
             final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
             long start = System.currentTimeMillis();
             
@@ -790,6 +790,7 @@
                 .displayName("waiting on "+sensor.getName())
                 .description("Waiting on sensor "+sensor.getName()+" from "+source)
                 .tag("attributeWhenReady")
+                .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
                 .body(new WaitInTaskForAttributeReady<T,V>(this))
                 .build();
         }
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 13dda46..74e0ddd 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -175,18 +175,18 @@
         }
         
         final Object startCallback = properties.get("newTaskStartCallback");
-        properties.put("newTaskStartCallback", new Function<Object,Void>() {
-            public Void apply(Object it) {
+        properties.put("newTaskStartCallback", new Function<Task<?>,Void>() {
+            public Void apply(Task<?> it) {
                 registerPerThreadExecutionContext();
-                if (startCallback!=null) ExecutionUtils.invoke(startCallback, it);
+                if (startCallback!=null) BasicExecutionManager.invokeCallback(startCallback, it);
                 return null;
             }});
         
         final Object endCallback = properties.get("newTaskEndCallback");
-        properties.put("newTaskEndCallback", new Function<Object,Void>() {
-            public Void apply(Object it) {
+        properties.put("newTaskEndCallback", new Function<Task<?>,Void>() {
+            public Void apply(Task<?> it) {
                 try {
-                    if (endCallback!=null) ExecutionUtils.invoke(endCallback, it);
+                    if (endCallback!=null) BasicExecutionManager.invokeCallback(endCallback, it);
                 } finally {
                     clearPerThreadExecutionContext();
                 }
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index d90b1a1..0aab7d5 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -19,6 +19,7 @@
 package org.apache.brooklyn.util.core.task;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import groovy.lang.Closure;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -51,16 +52,22 @@
 import org.apache.brooklyn.api.mgmt.TaskAdaptable;
 import org.apache.brooklyn.core.BrooklynFeatureEnablement;
 import org.apache.brooklyn.core.config.Sanitizer;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.text.Identifiers;
+import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ExecutionList;
@@ -368,7 +375,6 @@
         return submitSubsequentScheduledTask(flags, task);
     }
     
-    @SuppressWarnings("unchecked")
     protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
         if (!task.isDone()) {
             task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags),
@@ -508,9 +514,15 @@
                      */
                     if (log.isDebugEnabled()) {
                         // debug only here, because most submitters will handle failures
-                        log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error);
-                        if (log.isTraceEnabled())
-                            log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error);
+                        if (error instanceof InterruptedException || error instanceof RuntimeInterruptedException) {
+                            log.debug("Detected interruption on task "+task+" (rethrowing)" +
+                                (Strings.isNonBlank(error.getMessage()) ? ": "+error.getMessage() : ""));
+                        } else {
+                            log.debug("Exception running task "+task+" (rethrowing): "+error);
+                        }
+                        if (log.isTraceEnabled()) {
+                            log.trace("Trace for exception running task "+task+" (rethrowing): "+error, error);
+                        }
                     }
                     throw Exceptions.propagate(error);
                 }
@@ -526,19 +538,64 @@
         }
     }
 
-    private final static class ListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
+    @SuppressWarnings("deprecation")
+    // TODO do we even need a listenable future here?  possibly if someone wants to interrogate the future it might
+    // be interesting, so possibly it is useful that we implement ListenableFuture...
+    private final static class CancellingListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
         private final Task<T> task;
+        private BasicExecutionManager execMgmt;
 
-        private ListenableForwardingFutureForTask(Future<T> delegate, ExecutionList list, Task<T> task) {
+        private CancellingListenableForwardingFutureForTask(BasicExecutionManager execMgmt, Future<T> delegate, ExecutionList list, Task<T> task) {
             super(delegate, list);
+            this.execMgmt = execMgmt;
             this.task = task;
         }
 
         @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
+        public boolean cancel(TaskCancellationMode mode) {
             boolean result = false;
-            if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning);
-            result |= super.cancel(mayInterruptIfRunning);
+            if (log.isTraceEnabled()) {
+                log.trace("CLFFT cancelling "+task+" mode "+mode);
+            }
+            if (!task.isCancelled()) result |= ((TaskInternal<T>)task).cancel(mode);
+            result |= delegate().cancel(mode.isAllowedToInterruptTask());
+            
+            if (mode.isAllowedToInterruptAllSubmittedTasks() || mode.isAllowedToInterruptDependentSubmittedTasks()) {
+                int subtasksFound=0;
+                int subtasksReallyCancelled=0;
+                
+                if (task instanceof HasTaskChildren) {
+                    for (Task<?> child: ((HasTaskChildren)task).getChildren()) {
+                        if (log.isTraceEnabled()) {
+                            log.trace("Cancelling "+child+" on recursive cancellation of "+task);
+                        }
+                        subtasksFound++;
+                        if (((TaskInternal<?>)child).cancel(mode)) {
+                            result = true;
+                            subtasksReallyCancelled++;
+                        }
+                    }
+                }
+                // TODO this is inefficient; might want to keep an index on submitted-by
+                for (Task<?> t: execMgmt.getAllTasks()) {
+                    if (task.equals(t.getSubmittedByTask())) {
+                        if (mode.isAllowedToInterruptAllSubmittedTasks() || BrooklynTaskTags.isTransient(t)) {
+                            if (log.isTraceEnabled()) {
+                                log.trace("Cancelling "+t+" on recursive cancellation of "+task);
+                            }
+                            subtasksFound++;
+                            if (((TaskInternal<?>)t).cancel(mode)) {
+                                result = true;
+                                subtasksReallyCancelled++;
+                            }
+                        }
+                    }
+                }
+                if (log.isTraceEnabled()) {
+                    log.trace("On cancel of "+task+", applicable subtask count "+subtasksFound+", of which "+subtasksReallyCancelled+" were actively cancelled");
+                }
+            }
+            
             ((TaskInternal<?>)task).runListeners();
             return result;
         }
@@ -571,9 +628,15 @@
 
     @SuppressWarnings("unchecked")
     protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) {
-        if (log.isTraceEnabled()) log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}", 
+        if (log.isTraceEnabled()) {
+            log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}; caller {}", 
                 new Object[] {task.getId(), task, Sanitizer.sanitize(flags), task.getTags(), 
-                (task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>")});
+                (task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>"),
+                Tasks.current() });
+            if (Tasks.current()==null && BrooklynTaskTags.isTransient(task)) {
+                log.trace("Stack trace for unparented submission of transient "+task, new Throwable("trace only (not an error)"));
+            }
+        }
         
         if (task instanceof ScheduledTask)
             return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task);
@@ -604,15 +667,16 @@
         } else {
             future = runner.submit(job);
         }
-        // on completion, listeners get triggered above; here, below we ensure they get triggered on cancel
-        // (and we make sure the same ExecutionList is used in the future as in the task)
-        ListenableFuture<T> listenableFuture = new ListenableForwardingFutureForTask<T>(future, ((TaskInternal<T>)task).getListeners(), task);
-        // doesn't matter whether the listener is added to the listenableFuture or the task,
-        // except that for the task we can more easily wrap it so that it only logs debug if the executor is shutdown
-        // (avoid a bunch of ugly warnings in tests which start and stop things a lot!)
-        // [probably even nicer to run this in the same thread, it doesn't do much; but that is messier to implement]
+        // SubmissionCallable (above) invokes the listeners on completion;
+        // this future allows a caller to add custom listeners
+        // (it does not notify the listeners; that's our job);
+        // except on cancel we want to listen
+        ListenableFuture<T> listenableFuture = new CancellingListenableForwardingFutureForTask<T>(this, future, ((TaskInternal<T>)task).getListeners(), task);
+        // and we want to make sure *our* (manager) listeners are given suitable callback 
         ((TaskInternal<T>)task).addListener(new SubmissionListenerToCallOtherListeners<T>(task), runner);
+        // NB: can the above mean multiple callbacks to TaskInternal#runListeners?
         
+        // finally expose the future to callers
         ((TaskInternal<T>)task).initInternalFuture(listenableFuture);
         
         return task;
@@ -665,9 +729,27 @@
             PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
             ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
         }
-        ExecutionUtils.invoke(flags.get("newTaskStartCallback"), task);
+        invokeCallback(flags.get("newTaskStartCallback"), task);
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    // not ideal, such loose typing on the callback -- should prefer Function<Task,Object>
+    // but at least it's package-private
+    static Object invokeCallback(Object callable, Task<?> task) {
+        if (callable instanceof Closure) return ((Closure<?>)callable).call(task);
+        if (callable instanceof Callable) {
+            try {
+                return ((Callable<?>)callable).call();
+            } catch (Throwable t) {
+                throw Exceptions.propagate(t);
+            }
+        }
+        if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; }
+        if (callable instanceof Function) { return ((Function)callable).apply(task); }
+        if (callable==null) return null;
+        throw new IllegalArgumentException("Cannot invoke unexpected callback object "+callable+" of type "+callable.getClass()+" on "+task);
+    }
+    
     /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
     protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
         internalAfterEnd(flags, task, false, true);
@@ -693,7 +775,7 @@
         }
         if (isEndingAllIterations) {
             incompleteTaskIds.remove(task.getId());
-            ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task);
+            invokeCallback(flags.get("newTaskEndCallback"), task);
             ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
         }
 
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
index 0c26dd1..7c29bba 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
@@ -155,7 +155,7 @@
             (Strings.isNonEmpty(displayName) ? 
                 displayName : 
                 (job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) +
-            ":"+getId()+"]";
+            "]@"+getId();
     }
 
     @Override
@@ -196,7 +196,7 @@
     protected Maybe<Task<?>> submittedByTask;
 
     protected volatile Thread thread = null;
-    private volatile boolean cancelled = false;
+    protected volatile boolean cancelled = false;
     /** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */
     protected volatile Future<T> internalFuture = null;
     
@@ -288,15 +288,34 @@
     }
     
     @Override
-    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+    public final synchronized boolean cancel(boolean mayInterruptIfRunning) {
+        // semantics changed in 2016-01, previously "true" was INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS
+        return cancel(mayInterruptIfRunning ? TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS
+            : TaskCancellationMode.DO_NOT_INTERRUPT);
+    }
+    
+    @Override @Beta
+    public synchronized boolean cancel(TaskCancellationMode mode) {
         if (isDone()) return false;
-        boolean cancel = true;
-        cancelled = true;
-        if (internalFuture!=null) { 
-            cancel = internalFuture.cancel(mayInterruptIfRunning);
+        if (log.isTraceEnabled()) {
+            log.trace("BT cancelling "+this+" mode "+mode);
         }
+        cancelled = true;
+        doCancel(mode);
         notifyAll();
-        return cancel;
+        return true;
+    }
+    
+    @SuppressWarnings("deprecation")
+    protected boolean doCancel(TaskCancellationMode mode) {
+        if (internalFuture!=null) { 
+            if (internalFuture instanceof ListenableForwardingFuture) {
+                return ((ListenableForwardingFuture<?>)internalFuture).cancel(mode);
+            } else {
+                return internalFuture.cancel(mode.isAllowedToInterruptTask());
+            }
+        }
+        return true;
     }
 
     @Override
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
index b7985c8..51a4e34 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
@@ -158,27 +158,44 @@
     }
 
     @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-        return cancel(mayInterruptIfRunning, mayInterruptIfRunning, true);
-    }
-    public boolean cancel(boolean mayInterruptTask, boolean interruptPrimaryThread, boolean alsoCancelChildren) {
-        if (isDone()) return false;
-        if (log.isTraceEnabled()) log.trace("cancelling {}", this);
-        boolean cancel = super.cancel(mayInterruptTask);
-        if (alsoCancelChildren) {
+    protected boolean doCancel(TaskCancellationMode mode) {
+        boolean result = false;
+        if (mode.isAllowedToInterruptDependentSubmittedTasks() || mode.isAllowedToInterruptAllSubmittedTasks()) {
             for (Task<?> t: secondaryJobsAll)
-                cancel |= t.cancel(mayInterruptTask);
+                result = ((TaskInternal<?>)t).cancel(mode) || result;
         }
+        return super.doCancel(mode) || result;
+        // returns true if anything is successfully cancelled
+    }
+    
+    public boolean cancel(TaskCancellationMode mode) {
+        return cancel(mode, null);
+    }
+    
+    protected boolean cancel(TaskCancellationMode mode, Boolean interruptPrimaryThreadOverride) {
+        if (isDone()) return false;
+        if (log.isTraceEnabled()) log.trace("cancelling DST {}", this);
+        
+        // first do the super's cancel, setting cancelled, and calling doCancel to cancel children
+        boolean result = super.cancel(mode);
+        // then come back and ensure our primary thread is cancelled if needed
+        
+        if (interruptPrimaryThreadOverride==null) interruptPrimaryThreadOverride = mode.isAllowedToInterruptTask();
+        if (log.isTraceEnabled()) {
+            log.trace("DST cancelling "+this+" mode "+mode+", interruptPrimary "+interruptPrimaryThreadOverride);
+        }
+
         synchronized (jobTransitionLock) {
             if (primaryThread!=null) {
-                if (interruptPrimaryThread) {
+                if (interruptPrimaryThreadOverride) {
                     if (log.isTraceEnabled()) log.trace("cancelling {} - interrupting", this);
                     primaryThread.interrupt();
                 }
-                cancel = true;
+                result = true;
             }
         }
-        return cancel;
+        
+        return result;
     }
     
     @Override
@@ -309,7 +326,7 @@
                                         }
 
                                         if (!primaryFinished && failureHandlingConfig.cancelPrimaryOnSecondaryFailure) {
-                                            cancel(true, false, false);
+                                            cancel(TaskCancellationMode.INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS, false);
                                         }
                                         
                                         result.add(Tasks.getError(secondaryJob));
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ExecutionUtils.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ExecutionUtils.java
deleted file mode 100644
index 72a5ae4..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ExecutionUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.util.core.task;
-
-import groovy.lang.Closure;
-
-import java.util.concurrent.Callable;
-
-import com.google.common.base.Function;
-import com.google.common.base.Throwables;
-
-public class ExecutionUtils {
-    /**
-     * Attempts to run/call the given object, with the given arguments if possible, preserving the return value if there is one (null otherwise);
-     * throws exception if the callable is a non-null object which cannot be invoked (not a callable or runnable)
-     * @deprecated since 0.7.0 ; this super-loose typing should be avoided; if it is needed, let's move it to one of the Groovy compatibility classes
-     */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    public static Object invoke(Object callable, Object ...args) {
-        if (callable instanceof Closure) return ((Closure<?>)callable).call(args);
-        if (callable instanceof Callable) {
-            try {
-                return ((Callable<?>)callable).call();
-            } catch (Throwable t) {
-                throw Throwables.propagate(t);
-            }
-        }
-        if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; }
-        if (callable instanceof Function && args.length == 1) { return ((Function)callable).apply(args[0]); }
-        if (callable==null) return null;
-        throw new IllegalArgumentException("Cannot invoke unexpected object "+callable+" of type "+callable.getClass()+", with "+args.length+" args");
-    }
-}
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
index 4ce56d1..cbc474c 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
@@ -21,15 +21,26 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 
+import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.util.concurrent.ExecutionList;
 import com.google.common.util.concurrent.ForwardingFuture.SimpleForwardingFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 
-/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the resposibility to:
+/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the responsibility to:
  * <li> invoke the listeners on job completion (success or error)
- * <li> invoke the listeners on cancel */
+ * <li> invoke the listeners on cancel
+ * 
+ * @deprecated since 0.9.0 likely to leave the public API */
+@Deprecated  // TODO just one subclass, it can hold the behaviour we need from this, 
+// and the methods here are surprising as they expect the caller to notify the list
 public abstract class ListenableForwardingFuture<T> extends SimpleForwardingFuture<T> implements ListenableFuture<T> {
 
+    private static final Logger log = LoggerFactory.getLogger(ListenableForwardingFuture.class);
+    
+    // TODO these are never accessed or used
     final ExecutionList listeners;
     
     protected ListenableForwardingFuture(Future<T> delegate) {
@@ -42,9 +53,22 @@
         this.listeners = list;
     }
 
+    private static boolean warned = false;
+    
     @Override
     public void addListener(Runnable listener, Executor executor) {
+        if (!warned) {
+            log.warn("Use of deprecated ListenableForwardingFuture.addListener at "+this+" (future calls will not be logged)", new Throwable("stack trace"));
+            warned = true;
+        }
+        
         listeners.add(listener, executor);
     }
     
+    public abstract boolean cancel(TaskCancellationMode mode);
+    
+    public final boolean cancel(boolean mayInterrupt) {
+        return cancel(TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS);
+    }
+    
 }
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
index c1ad4f8..219f4f8 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
@@ -43,7 +43,7 @@
  */
 // TODO ScheduledTask is a very pragmatic implementation; would be nice to tighten, 
 // reduce external assumptions about internal structure, and clarify "done" semantics
-public class ScheduledTask extends BasicTask {
+public class ScheduledTask extends BasicTask<Object> {
     
     final Callable<Task<?>> taskFactory;
 
@@ -84,7 +84,7 @@
         this(MutableMap.of(), task);
     }
 
-    public ScheduledTask(Map flags, final Task<?> task){
+    public ScheduledTask(Map<?,?> flags, final Task<?> task){
         this(flags, new Callable<Task<?>>(){
             @Override
             public Task<?> call() throws Exception {
@@ -92,7 +92,7 @@
             }});
     }
 
-    public ScheduledTask(Map flags, Callable<Task<?>> taskFactory) {
+    public ScheduledTask(Map<?,?> flags, Callable<Task<?>> taskFactory) {
         super(flags);
         this.taskFactory = taskFactory;
         
@@ -194,13 +194,11 @@
     }
     
     @Override
-    public synchronized boolean cancel(boolean mayInterrupt) {
-        boolean result = super.cancel(mayInterrupt);
+    protected boolean doCancel(org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode mode) {
         if (nextRun!=null) {
-            nextRun.cancel(mayInterrupt);
-            notifyAll();
+            ((TaskInternal<?>)nextRun).cancel(mode);
         }
-        return result;
+        return super.doCancel(mode);
     }
     
     /**
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
index 2bf0fec..99c2773 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
@@ -29,6 +29,7 @@
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Function;
+import com.google.common.base.Objects;
 import com.google.common.util.concurrent.ExecutionList;
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -95,6 +96,8 @@
     
     Object getExtraStatusText();
 
+    /** On task completion (or cancellation) runs the listeners which have been registered using 
+     * {@link #addListener(Runnable, java.util.concurrent.Executor)}. */
     void runListeners();
 
     void setEndTimeUtc(long val);
@@ -120,5 +123,41 @@
     /** if a task is a proxy for another one (used mainly for internal tasks),
      * this returns the "real" task represented by this one */
     Task<?> getProxyTarget();
+
+    /** clearer semantics around cancellation; may be promoted to {@link Task} if we  */
+    @Beta
+    public boolean cancel(TaskCancellationMode mode);
+    
+    @Beta
+    public static class TaskCancellationMode {
+        public static final TaskCancellationMode DO_NOT_INTERRUPT = new TaskCancellationMode(false, false, false);
+        public static final TaskCancellationMode INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS = new TaskCancellationMode(true, false, false);
+        public static final TaskCancellationMode INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS = new TaskCancellationMode(true, true, false);
+        public static final TaskCancellationMode INTERRUPT_TASK_AND_ALL_SUBMITTED_TASKS = new TaskCancellationMode(true, true, true);
+        
+        private final boolean allowedToInterruptTask, 
+            allowedToInterruptDependentSubmittedTasks, 
+            allowedToInterruptAllSubmittedTasks;
+        
+        private TaskCancellationMode(boolean mayInterruptIfRunning, boolean interruptSubmittedTransients, boolean interruptAllSubmitted) {
+            this.allowedToInterruptTask = mayInterruptIfRunning;
+            this.allowedToInterruptDependentSubmittedTasks = interruptSubmittedTransients;
+            this.allowedToInterruptAllSubmittedTasks = interruptAllSubmitted;
+        }
+        
+        public boolean isAllowedToInterruptTask() { return allowedToInterruptTask; }
+        /** Implementation-dependent what "dependent" means in this context, 
+         * e.g. may be linked to a "transient" tag (that's what Brooklyn does) */ 
+        public boolean isAllowedToInterruptDependentSubmittedTasks() { return allowedToInterruptDependentSubmittedTasks; }
+        public boolean isAllowedToInterruptAllSubmittedTasks() { return allowedToInterruptAllSubmittedTasks; }
+        
+        @Override
+        public String toString() {
+            return Objects.toStringHelper(this).add("interruptTask", allowedToInterruptTask)
+                .add("interruptDependentSubmitted", allowedToInterruptDependentSubmittedTasks)
+                .add("interruptAllSubmitted", allowedToInterruptAllSubmittedTasks)
+                .toString();
+        }
+    }
     
 }
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java
index 8e46002..d8d3764 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java
@@ -60,4 +60,20 @@
             return "displayNameMatches("+matcher+")";
         }
     }
+    
+    public static Predicate<Task<?>> isDone() {
+        return new IsDone();
+    }
+    
+    private static class IsDone implements Predicate<Task<?>> {
+        @Override
+        public boolean apply(Task<?> input) {
+            return input.isDone();
+        }
+        @Override
+        public String toString() {
+            return "isDone()";
+        }
+    }
+    
 }
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java
index a291c53..e8e7890 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java
@@ -70,7 +70,6 @@
         if (em != null) em.shutdownNow();
     }
     
-    @SuppressWarnings("unchecked")
     @Test
     public void testScheduledTaskExecutedAfterDelay() throws Exception {
         int delay = 100;
@@ -95,7 +94,6 @@
         assertTrue(actualDelay < (delay+MAX_OVERHEAD_MS), "actualDelay="+actualDelay+"; delay="+delay);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testScheduledTaskExecutedAtRegularPeriod() throws Exception {
         final int period = 100;
@@ -127,7 +125,6 @@
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testCanCancelScheduledTask() throws Exception {
         final int period = 1;
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
index ceff29f..364870a 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
@@ -30,10 +30,13 @@
 
 import org.apache.brooklyn.api.mgmt.HasTaskChildren;
 import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.CollectionFunctionals;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.math.MathPredicates;
 import org.apache.brooklyn.util.time.CountdownTimer;
@@ -49,6 +52,8 @@
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -154,7 +159,17 @@
     }
     
     public Task<String> sayTask(String message, Duration duration, String message2) {
-        return Tasks.<String>builder().body(sayCallable(message, duration, message2)).build();
+        return Tasks.<String>builder().displayName("say:"+message).body(sayCallable(message, duration, message2)).build();
+    }
+    
+    public <T> Task<T> submitting(final Task<T> task) {
+        return Tasks.<T>builder().displayName("submitting:"+task.getId()).body(new Callable<T>() {
+            @Override
+            public T call() throws Exception {
+                ec.submit(task);
+                return task.get();
+            }
+        }).build();
     }
     
     @Test
@@ -207,6 +222,85 @@
         // but we do _not_ get a mutex from task3 as it does not run (is not interrupted)
         Assert.assertEquals(cancellations.availablePermits(), 0);
     }
+    
+    @Test
+    public void testCancellationModeAndSubmitted() throws Exception {
+        doTestCancellationModeAndSubmitted(true, TaskCancellationMode.DO_NOT_INTERRUPT, false, false);
+        
+        doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_AND_ALL_SUBMITTED_TASKS, true, true);
+        doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS, true, true);
+        doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS, true, false);
+        
+        // if it's not transient, it should only be cancelled on "all submitted"
+        doTestCancellationModeAndSubmitted(false, TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS, true, false);
+        doTestCancellationModeAndSubmitted(false, TaskCancellationMode.INTERRUPT_TASK_AND_ALL_SUBMITTED_TASKS, true, true);
+        
+        // cancellation mode left off should be the same as TASK_AND_DEPENDENT, i.e. don't cancel non-transient bg submitted
+        doTestCancellationModeAndSubmitted(true, null, true, true);
+        doTestCancellationModeAndSubmitted(false, null, true, false);
+        // and 'true' should be the same
+        doTestCancellationModeAndSubmitted(true, true, true, true);
+        doTestCancellationModeAndSubmitted(false, true, true, false);
+        
+        // cancellation mode false should be the same as DO_NOT_INTERRUPT
+        doTestCancellationModeAndSubmitted(true, false, false, false);
+    }
+    
+    public void doTestCancellationModeAndSubmitted(
+            boolean isSubtaskTransient,
+            Object cancellationMode,
+            boolean expectedTaskInterrupted,
+            boolean expectedSubtaskCancelled
+            ) throws Exception {
+        tearDown(); setUp();
+        
+        final Task<String> t1 = sayTask("1-wait", Duration.minutes(10), "1-done");
+        if (isSubtaskTransient) {
+            BrooklynTaskTags.addTagDynamically(t1, BrooklynTaskTags.TRANSIENT_TASK_TAG);
+        }
+        
+        final Task<List<?>> t = Tasks.parallel(
+                submitting(t1),
+                sayTask("2-wait", Duration.minutes(10), "2-done"));
+        ec.submit(t);
+        
+        waitForMessages(Predicates.compose(MathPredicates.greaterThanOrEqual(2), CollectionFunctionals.sizeFunction()), TIMEOUT);
+        Asserts.assertEquals(MutableSet.copyOf(messages), MutableSet.of("1-wait", "2-wait"));
+
+        if (cancellationMode==null) {
+            ((TaskInternal<?>)t).cancel();
+        } else if (cancellationMode instanceof Boolean) {
+            t.cancel((Boolean)cancellationMode);
+        } else if (cancellationMode instanceof TaskCancellationMode) {
+            ((TaskInternal<?>)t).cancel((TaskCancellationMode)cancellationMode);
+        } else {
+            throw new IllegalStateException("Invalid cancellationMode: "+cancellationMode);
+        }
+
+        // the cancelled task always reports cancelled and done
+        Assert.assertEquals(t.isDone(), true);
+        Assert.assertEquals(t.isCancelled(), true);
+        // end time might not be set for another fraction of a second
+        if (expectedTaskInterrupted) { 
+            Asserts.eventually(new Supplier<Number>() {
+                @Override public Number get() { return t.getEndTimeUtc(); }}, 
+                MathPredicates.<Number>greaterThanOrEqual(0));
+        } else {
+            Assert.assertTrue(t.getEndTimeUtc() < 0, "Wrong end time: "+t.getEndTimeUtc());
+        }
+        
+        if (expectedSubtaskCancelled) {
+            Asserts.eventually(Suppliers.ofInstance(t1), TaskPredicates.isDone());
+            Assert.assertTrue(t1.isCancelled());
+            Asserts.eventually(new Supplier<Number>() {
+                @Override public Number get() { return t1.getEndTimeUtc(); }}, 
+                MathPredicates.<Number>greaterThanOrEqual(0));
+        } else {
+            Time.sleep(Duration.millis(5));
+            Assert.assertFalse(t1.isCancelled());
+            Assert.assertFalse(t1.isDone());
+        }
+    }
 
     protected void waitForMessages(Predicate<? super List<String>> predicate, Duration timeout) throws Exception {
         long endtime = System.currentTimeMillis() + timeout.toMilliseconds();
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java
index 1d1c3af..6fd3021 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java
@@ -64,6 +64,11 @@
         protected TaskInternal<T> delegate() {
             return delegate;
         }
+
+        @Override
+        public boolean cancel(TaskCancellationMode mode) {
+            return delegate.cancel(mode);
+        }
     }
     
     private BasicExecutionManager em;
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
index 1d551e8..5c11355 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
@@ -45,7 +45,7 @@
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 
-@SuppressWarnings({"unchecked","rawtypes"})
+@SuppressWarnings({"rawtypes"})
 public class ScheduledExecutionTest {
 
     public static final Logger log = LoggerFactory.getLogger(ScheduledExecutionTest.class);
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
index fce6f0f..8a25361 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
@@ -57,8 +57,8 @@
                 .body(Callables.<Object>returning("val"))
                 .displayName("myname")
                 .build());
-        assertTrue(TaskPredicates.displayNameMatches(Predicates.equalTo("myname")).apply(task));
-        assertFalse(TaskPredicates.displayNameMatches(Predicates.equalTo("wrong")).apply(task));
+        assertTrue(TaskPredicates.displayNameSatisfies(Predicates.equalTo("myname")).apply(task));
+        assertFalse(TaskPredicates.displayNameSatisfies(Predicates.equalTo("wrong")).apply(task));
     }
     
     @Test
diff --git a/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java b/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
index e483d0b..a6d3b8e 100644
--- a/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
+++ b/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
@@ -22,6 +22,7 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.config.ConfigKey;
@@ -35,6 +36,7 @@
 import org.apache.brooklyn.rest.transform.EntityTransformer;
 import org.apache.brooklyn.rest.util.WebResourceUtils;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.core.task.ValueResolver;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
@@ -71,14 +73,32 @@
     public Map<String, Object> batchConfigRead(String application, String entityToken, Boolean raw) {
         // TODO: add test
         Entity entity = brooklyn().getEntity(application, entityToken);
-        Map<ConfigKey<?>, ?> source = ((EntityInternal) entity).config().getBag().getAllConfigAsConfigKeyMap();
-        Map<String, Object> result = Maps.newLinkedHashMap();
-        for (Map.Entry<ConfigKey<?>, ?> ek : source.entrySet()) {
-            Object value = ek.getValue();
-            result.put(ek.getKey().getName(), 
-                resolving(value).preferJson(true).asJerseyOutermostReturnValue(false).raw(raw).context(entity).timeout(Duration.ZERO).renderAs(ek.getKey()).resolve());
+        // wrap in a task for better runtime view
+        return Entities.submit(entity, Tasks.<Map<String,Object>>builder().displayName("REST API batch config read").body(new BatchConfigRead(this, entity, raw)).build()).getUnchecked();
+    }
+    
+    private static class BatchConfigRead implements Callable<Map<String,Object>> {
+        private EntityConfigResource resource;
+        private Entity entity;
+        private Boolean raw;
+
+        public BatchConfigRead(EntityConfigResource resource, Entity entity, Boolean raw) {
+            this.resource = resource;
+            this.entity = entity;
+            this.raw = raw;
         }
-        return result;
+
+        @Override
+        public Map<String, Object> call() throws Exception {
+            Map<ConfigKey<?>, ?> source = ((EntityInternal) entity).config().getBag().getAllConfigAsConfigKeyMap();
+            Map<String, Object> result = Maps.newLinkedHashMap();
+            for (Map.Entry<ConfigKey<?>, ?> ek : source.entrySet()) {
+                Object value = ek.getValue();
+                result.put(ek.getKey().getName(), 
+                    resource.resolving(value).preferJson(true).asJerseyOutermostReturnValue(false).raw(raw).context(entity).timeout(Duration.ZERO).renderAs(ek.getKey()).resolve());
+            }
+            return result;
+        }
     }
 
     @Override