pause most tasks submitted during entity startup, until all entities are managed

can affect new tasks and especially rebind if an adjunct is started and tries to access another entity
which hasn't yet started up; particularly an issue if there is workflow which will be checkpointed,
and another persisted workflow references some other entity as a parent workflow
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
index 356533e..8052af3 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
@@ -156,12 +156,9 @@
      */
     SubscriptionManager getSubscriptionManager();
 
-    //TODO (Alex) I'm not sure the following two getXxxContext methods are needed on the interface;
-    //I expect they will only be called once, in AbstractEntity, and fully capable
-    //there of generating the respective contexts from the managers
-    //(Litmus test will be whether there is anything in FederatedManagementContext
-    //which requires a custom FederatedExecutionContext -- or whether BasicEC 
-    //works with FederatedExecutionManager)
+    // not sure the following two getXxxContext methods are desired on this interface;
+    // almost everyone should use the entity.getExecutionContext() which is a shared instance with better blocking;
+    // this should just be called to initialize and for special cases where we want to bypass that startup blocking
     /**
      * Returns an {@link ExecutionContext} instance representing tasks 
      * (from the {@link ExecutionManager}) associated with this entity, and capable 
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java
index 644909b..9bb4efa 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java
@@ -31,6 +31,7 @@
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.test.entity.TestApplication;
 import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcess;
+import org.apache.brooklyn.entity.stock.BasicApplication;
 import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool;
@@ -38,6 +39,7 @@
 import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool.ExecParams;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.BeforeMethod;
@@ -232,10 +234,9 @@
 
     @Test(groups="Integration") // because slow
     public void testSshCommandSensorPeriodicFeedServiceUpFalseDoesNotRunAtStartup() throws Exception {
-        RecordingSshTool.setCustomResponse(".*myCommand.*", new RecordingSshTool.CustomResponse(0, "myResponse", null));
+        RecordingSshTool.setCustomResponse(".*myCommand.*", new RecordingSshTool.CustomResponse(0, "myResponse0", null));
 
-        Stopwatch sw = Stopwatch.createStarted();
-        Entity app = createAndStartApplication(
+        BasicApplication app = (BasicApplication) createAndStartApplication(
                 "location:",
                 "  localhost:",
                 "    sshToolClass: "+RecordingSshTool.class.getName(),
@@ -248,13 +249,19 @@
                 "    brooklyn.config:",
                 "      name: mySensor",
                 "      command: myCommand",
-                "      period: 5s",
+                "      period: 2s",
                 "      onlyIfServiceUp: true");
         waitForApplicationTasks(app);
 
         VanillaSoftwareProcess entity = (VanillaSoftwareProcess) Iterables.getOnlyElement(app.getChildren());
-        EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newStringSensor("mySensor"), "myResponse");
-        Asserts.assertThat(Duration.of(sw), d -> d.isLongerThan(Duration.seconds(4)));
+        EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newStringSensor("mySensor"), "myResponse0");
+
+        // once run once, it shouldn't run again for 2s (plus or minus 1s tolerance here)
+        Stopwatch sw = Stopwatch.createStarted();
+        RecordingSshTool.setCustomResponse(".*myCommand.*", new RecordingSshTool.CustomResponse(0, "myResponse1", null));
+        EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newStringSensor("mySensor"), "myResponse1");
+        Asserts.assertThat(Duration.of(sw), d -> d.isLongerThan(Duration.seconds(1)));
+        Asserts.assertThat(Duration.of(sw), d -> d.isShorterThan(Duration.seconds(3)));
     }
 
     @Test
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
index c3ea682..366b9ab 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
@@ -24,6 +24,7 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import org.apache.brooklyn.api.effector.Effector;
@@ -52,6 +53,7 @@
 import org.apache.brooklyn.core.workflow.DanglingWorkflowException;
 import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
 import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors;
+import org.apache.brooklyn.util.core.task.BasicExecutionContext;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -231,10 +233,11 @@
                         this.managementContext = info.getManagementContext();
                         nonDeploymentManagementContext.setMode(NonDeploymentManagementContextMode.MANAGEMENT_STARTING);
 
-                        if (!isReadOnly()) {
-                            nonDeploymentManagementContext.getSubscriptionManager().setDelegate((AbstractSubscriptionManager) managementContext.getSubscriptionManager());
-                            nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForSubscribing();
-                        }
+                        // defer this until mgmt context started, so all other entities will be known, in case they are accessed in the tasks
+//                        if (!isReadOnly()) {
+//                            nonDeploymentManagementContext.getSubscriptionManager().setDelegate((AbstractSubscriptionManager) managementContext.getSubscriptionManager());
+//                            nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForSubscribing();
+//                        }
 
                         managementContextUsable.set(true);
                         currentlyDeployed.set(true);
@@ -280,9 +283,13 @@
                         entity.onManagementStarting();
 
                         // start those policies etc which are labelled as auto-start
-                        entity.policies().forEach(adj -> { if (adj instanceof EntityAdjunct.AutoStartEntityAdjunct) ((EntityAdjunct.AutoStartEntityAdjunct)adj).start(); });
-                        entity.enrichers().forEach(adj -> { if (adj instanceof EntityAdjunct.AutoStartEntityAdjunct) ((EntityAdjunct.AutoStartEntityAdjunct)adj).start(); });
-                        entity.feeds().forEach(f -> { if (!f.isActivated()) f.start(); });
+                        BiConsumer<String,Runnable> queueTask = (name, r) -> entity.getExecutionContext().submit(name, r);
+                        entity.policies().forEach(adj -> { if (adj instanceof EntityAdjunct.AutoStartEntityAdjunct)
+                            queueTask.accept("Start policy "+adj, ((EntityAdjunct.AutoStartEntityAdjunct)adj)::start); });
+                        entity.enrichers().forEach(adj -> { if (adj instanceof EntityAdjunct.AutoStartEntityAdjunct)
+                            queueTask.accept("Start enricher "+adj, ((EntityAdjunct.AutoStartEntityAdjunct)adj)::start); });
+                        entity.feeds().forEach(f -> { if (!f.isActivated())
+                            queueTask.accept("Start feed "+f, f::start); });
 
                         if (AUTO_FAIL_AND_RESUME_WORKFLOWS) {
                             // resume any workflows that were dangling due to shutdown
@@ -293,7 +300,7 @@
                                     .collect(Collectors.toList());
                             if (!shutdownInterruptedWorkflows.isEmpty()) {
                                 log.debug("Discovered workflows noted as 'interrupted' on startup at "+entity+", will resume as dangling: "+shutdownInterruptedWorkflows);
-                                entity.getExecutionContext().submit(DynamicTasks.of("Resuming with failure " + shutdownInterruptedWorkflows.size() + " interrupted workflow" + (shutdownInterruptedWorkflows.size() != 1 ? "s" : ""), () -> {
+                                getManagementContext().getExecutionContext(entity).submit(DynamicTasks.of("Resuming with failure " + shutdownInterruptedWorkflows.size() + " interrupted workflow" + (shutdownInterruptedWorkflows.size() != 1 ? "s" : ""), () -> {
                                     shutdownInterruptedWorkflows.forEach(w -> {
                                         // these are backgrounded because they are expected to fail
                                         // we also have to wait until mgmt is complete
@@ -346,9 +353,7 @@
             /* on start, we want to:
              * - set derived/inherited config values (not needed, the specs should have taken care of that?)
              * - publish all queued sensors (done below)
-             * - start all queued executions 
-             *   (e.g. subscription delivery - done below? are there others and if so how are they unlocked?
-             *   curious where the "start queued tasks" logic is; must be somewhere as it all seems to have been working fine (Aug 2016)) 
+             * - start all queued executions (unpause entity's execution context, subscription delivery)
              * [in exactly this order, at each entity]
              * then subsequent sensor events and executions occur directly (no queueing)
              * 
@@ -357,7 +362,10 @@
              */                
             
             if (!isReadOnly()) {
+                nonDeploymentManagementContext.getSubscriptionManager().setDelegate((AbstractSubscriptionManager) managementContext.getSubscriptionManager());
                 nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForPublishing();
+                nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForSubscribing();
+                ((BasicExecutionContext)getExecutionContext()).unpause();
             }
             
             if (!isReadOnly()) {
@@ -520,7 +528,9 @@
         if (managementContextUsable.get()) {
             synchronized (this) {
                 if (executionContext!=null) return executionContext;
-                executionContext = managementContext.getExecutionContext(entity);
+                ExecutionContext newExecutionContext = managementContext.getExecutionContext(entity);
+                ((BasicExecutionContext)newExecutionContext).pause(); // start paused, so things don't run until mgmt is started, and all entities known
+                executionContext = newExecutionContext;
                 return executionContext;
             }
         }
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
index db28461..4fb32e5 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
@@ -47,6 +47,7 @@
 import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState;
 import org.apache.brooklyn.core.BrooklynFeatureEnablement;
 import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.entity.drivers.downloads.BasicDownloadsManager;
 import org.apache.brooklyn.core.internal.BrooklynInitialization;
 import org.apache.brooklyn.core.internal.BrooklynProperties;
@@ -360,7 +361,7 @@
     }
 
     protected <T> Task<T> runAtEntity(Entity entity, TaskAdaptable<T> task) {
-        getExecutionContext(entity).submit(task);
+        ((EntityInternal)entity).getExecutionContext().submit(task);
         if (DynamicTasks.getTaskQueuingContext()!=null) {
             // put it in the queueing context so it appears in the GUI
             // mark it inessential as this is being invoked from code,
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index dc5dfad..b0757ba 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -18,10 +18,6 @@
  */
 package org.apache.brooklyn.util.core.task;
 
-import com.google.common.annotations.Beta;
-import com.google.common.base.Function;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
 import java.lang.reflect.Proxy;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -29,6 +25,7 @@
 import java.util.Collections;
 import java.util.Deque;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -39,6 +36,11 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.ExecutionContext;
 import org.apache.brooklyn.api.mgmt.ExecutionManager;
@@ -51,6 +53,7 @@
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedItem;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
+import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.task.BasicExecutionManager.BrooklynTaskLoggingMdc;
@@ -60,6 +63,7 @@
 import org.apache.brooklyn.util.javalang.Threads;
 import org.apache.brooklyn.util.time.CountdownTimer;
 import org.apache.brooklyn.util.time.Duration;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -404,17 +408,44 @@
             });
         }
 
-        if (task instanceof Task) {
-            return executionManager.submit(properties, (Task)task);
-        } else if (task instanceof Callable) {
-            return executionManager.submit(properties, (Callable)task);
-        } else if (task instanceof Runnable) {
-            return (Task<T>) executionManager.submit(properties, (Runnable)task);
-        } else {
-            throw new IllegalArgumentException("Unhandled task type: task="+task+"; type="+(task!=null ? task.getClass() : "null"));
+        return submitViaExecutionManagerOrHold(task, properties);
+    }
+
+    boolean paused = false;
+    List<Pair<Task,Map>> tasksQueuedWhilePaused = MutableList.of();
+    public void pause() {
+        this.paused = true;
+    }
+    public void unpause() {
+        synchronized (tasksQueuedWhilePaused) {
+            tasksQueuedWhilePaused.forEach(pair -> submitWithoutCheckingPaused(pair.getLeft(), pair.getRight()));
+            tasksQueuedWhilePaused.clear();
+            this.paused = false;
         }
     }
 
+    private <T> Task submitViaExecutionManagerOrHold(Object task, Map properties) {
+        Task taskT = null;
+        if (task instanceof Task) taskT = (Task) task;
+        else if (task instanceof TaskAdaptable) taskT = ((TaskAdaptable) task).asTask();
+        else if (task instanceof Callable) taskT = new BasicTask(properties, (Callable)task);
+        else if (task instanceof Runnable) taskT = new BasicTask(properties, (Runnable)task);
+        else throw new IllegalArgumentException("Unhandled task type: task="+ task +"; type="+(task !=null ? task.getClass() : "null"));
+
+        if (paused) {
+            synchronized (tasksQueuedWhilePaused) {
+                if (paused) {
+                    tasksQueuedWhilePaused.add(Pair.of(taskT, properties));
+                    return taskT;
+                }
+            }
+        }
+        return submitWithoutCheckingPaused(taskT, properties);
+    }
+    private <T> Task submitWithoutCheckingPaused(Task task, Map properties) {
+        return executionManager.submit(properties, task);
+    }
+
     private String idStack(Entity target) {
         Deque<String> ids = new ArrayDeque<>();
         Entity e = target;
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java
index f1e7a41..87570bc 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java
@@ -626,7 +626,7 @@
         forcePersistNow(n1);
         Assert.assertTrue(entity.feeds().getFeeds().size() > 0, "Feeds: "+entity.feeds().getFeeds());
         for (Feed feed : entity.feeds().getFeeds()) {
-            assertTrue(feed.isRunning(), "Feed expected running, but it is non-running");
+            Asserts.eventually(() -> feed, Feed::isRunning, Duration.seconds(2), Duration.millis(10), "Feed expected running, but it is non-running");
         }
 
         HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
index ac9b412..cfcf407 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
@@ -29,6 +29,7 @@
 import org.apache.brooklyn.core.effector.EffectorTasks;
 import org.apache.brooklyn.core.enricher.AbstractEnricher;
 import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedStream;
 import org.apache.brooklyn.entity.software.base.SoftwareProcess;
@@ -128,6 +129,6 @@
     }
 
     ExecutionContext getEntityExecutionContext() {
-        return getManagementContext().getExecutionContext(entity);
+        return ((EntityInternal)entity).getExecutionContext();
     }
 }