Fix PeriodicEffectorPolicy
diff --git a/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java b/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java
index a2a64b9..065b432 100644
--- a/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java
+++ b/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java
@@ -219,7 +219,7 @@
     
     @Override
     public void clearCallHistory() {
-        callHistory.clear();;
+        callHistory.clear();
     }
     
     public static class TestEntityWithoutEnrichers extends TestEntityImpl {
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
index f4635bc..b61e012 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
@@ -40,6 +40,7 @@
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.trait.Startable;
 import org.apache.brooklyn.core.policy.AbstractPolicy;
+import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.config.ResolvingConfigBag;
 import org.apache.brooklyn.util.core.task.Tasks;
@@ -53,9 +54,9 @@
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.reflect.TypeToken;
 
 @Beta
@@ -101,7 +102,7 @@
 
     public static final ConfigKey<Boolean> RUNNING = ConfigKeys.builder(Boolean.class)
             .name("running")
-            .description("Set if the executor has started")
+            .description("[INTERNAL] Set if the executor has started")
             .defaultValue(Boolean.FALSE)
             .reconfigurable(true)
             .build();
@@ -109,7 +110,7 @@
     public static final ConfigKey<List<Long>> SCHEDULED = ConfigKeys.builder(new TypeToken<List<Long>>() { })
             .name("scheduled")
             .description("List of all scheduled execution start times")
-            .defaultValue(Lists.newCopyOnWriteArrayList())
+            .defaultValue(ImmutableList.of())
             .reconfigurable(true)
             .build();
 
@@ -126,6 +127,12 @@
         setup();
     }
 
+    @Override
+    public void rebind() {
+        // Called before setEntity; therefore don't do any real work here that might cause us to reference the entity
+        setup();
+    }
+
     public void setup() {
         if (executor != null) {
             executor.shutdownNow();
@@ -140,29 +147,39 @@
 
         effector = getEffector();
 
+        if (Boolean.TRUE.equals(config().get(RUNNING))) {
+            running.set(true);
+            resubmitOnResume();
+        }
+
         AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
-        subscriptions().subscribe(entity, sensor, this);
+        subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, sensor, this);
     }
 
     @Override
-    public void rebind() {
-        setup();
-
-        if (config().get(RUNNING)) {
-            running.set(true);
-
-            List<Long> scheduled = config().get(SCHEDULED);
-            for (Long when : scheduled) {
-                Duration wait = Duration.millis(when - System.currentTimeMillis());
-                if (wait.isPositive()) {
-                    schedule(wait);
-                } else {
-                    scheduled.remove(when);
-                }
-            }
+    public void resume() {
+        super.resume();
+        
+        if (running.get()) {
+            resubmitOnResume();
         }
     }
-
+    
+    protected List<Long> resubmitOnResume() {
+        List<Long> scheduled = config().get(SCHEDULED);
+        List<Long> updatedScheduled = MutableList.copyOf(scheduled);
+        for (Long when : scheduled) {
+            Duration wait = Duration.millis(when - System.currentTimeMillis());
+            if (wait.isPositive()) {
+                scheduleInExecutor(wait);
+            } else {
+                updatedScheduled.remove(when);
+            }
+        }
+        config().set(SCHEDULED, updatedScheduled);
+        return updatedScheduled;
+    }
+    
     @Override
     protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) {
         if (key.isReconfigurable()) {
@@ -178,13 +195,14 @@
         super.destroy();
     }
 
+    
     public abstract void start();
 
     protected Effector<?> getEffector() {
         String effectorName = config().get(EFFECTOR);
         Maybe<Effector<?>> effector = getEntity().getEntityType().getEffectorByName(effectorName);
         if (effector.isAbsentOrNull()) {
-            throw new IllegalStateException("Cannot find effector " + effectorName);
+            throw new IllegalStateException("Cannot find effector " + effectorName + " on entity " + getEntity());
         }
         return effector.get();
     }
@@ -196,8 +214,7 @@
         try {
             Calendar now = Calendar.getInstance();
             Calendar when = Calendar.getInstance();
-            boolean formatted = time.contains(":"); // FIXME deprecated TimeDuration coercion
-            Date parsed = formatted ? FORMATTER.parse(time) : new Date(Long.parseLong(time) * 1000);
+            Date parsed = parseTime(time);
             when.setTime(parsed);
             when.set(now.get(Calendar.YEAR), now.get(Calendar.MONTH), now.get(Calendar.DATE));
             if (when.before(now)) {
@@ -210,16 +227,35 @@
         }
     }
 
+    protected Date parseTime(String time) throws ParseException {
+        boolean formatted = time.contains(":"); // FIXME deprecated TimeDuration coercion
+        if (formatted) {
+            synchronized (FORMATTER) {
+                // DateFormat is not thread-safe; docs say to use one-per-thread, or to synchronize externally
+                return FORMATTER.parse(time);
+            }
+        } else {
+            return new Date(Long.parseLong(time) * 1000);
+        }
+    }
+    
     protected void schedule(Duration wait) {
-        List<Long> scheduled = config().get(SCHEDULED);
+        List<Long> scheduled = MutableList.copyOf(config().get(SCHEDULED));
         scheduled.add(System.currentTimeMillis() + wait.toMilliseconds());
+        config().set(SCHEDULED, scheduled);
 
+        scheduleInExecutor(wait);
+    }
+
+    private void scheduleInExecutor(Duration wait) {
         executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS);
     }
 
     @Override
     public synchronized void run() {
         if (effector == null) return;
+        if (!(isRunning() && getManagementContext().isRunning())) return;
+
         try {
             ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag());
             Map<String, Object> args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS);
@@ -233,8 +269,8 @@
             Object result = entity.invoke(effector, resolved).getUnchecked();
             LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result });
         } catch (RuntimeInterruptedException rie) {
-            Thread.interrupted();
-            // TODO sometimes this seems to hang the executor?
+            // Gracefully stop
+            Thread.currentThread().interrupt();
         } catch (Throwable t) {
             LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() });
             Exceptions.propagate(t);
@@ -246,7 +282,7 @@
         LOG.debug("{}: Got event {}", this, event);
         AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
         if (event.getSensor().getName().equals(sensor.getName())) {
-            Boolean start = (Boolean) event.getValue();
+            Boolean start = Boolean.TRUE.equals(event.getValue());
             if (start && running.compareAndSet(false, true)) {
                 config().set(RUNNING, true);
                 start();
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
index 0c2de9f..dafc69c 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
@@ -86,28 +86,30 @@
     }
 
     @Override
-    public void rebind() {
-        super.rebind();
-
-        // Check if we missed an entire period
-        List<Long> scheduled = config().get(SCHEDULED);
-        if (running.get() && scheduled.isEmpty()) {
+    protected List<Long> resubmitOnResume() {
+        List<Long> scheduled = super.resubmitOnResume();
+        
+        if (scheduled.isEmpty()) {
+            // We missed an entire period; re-calculate (rather than relying on run's finally block)
             start();
         }
+        return scheduled;
     }
-
+    
     @Override
     public synchronized void run() {
         try {
             super.run();
         } finally {
-            Duration period = config().get(PERIOD);
-            String time = config().get(TIME);
-            if (time == null || time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) {
-                schedule(period);
-            } else {
-                Duration wait = getWaitUntil(time);
-                schedule(wait.upperBound(period));
+            if (isRunning() && getManagementContext().isRunning()) {
+                Duration period = config().get(PERIOD);
+                String time = config().get(TIME);
+                if (time == null || time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) {
+                    schedule(period);
+                } else {
+                    Duration wait = getWaitUntil(time);
+                    schedule(wait.upperBound(period));
+                }
             }
         }
     }
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
index 5a3bed7..a2a5818 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
@@ -31,6 +31,7 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableMap;
 
 /**
  * A {@link Policy} the executes an {@link Effector} at a specific time in the future.
@@ -59,8 +60,8 @@
     public void setEntity(final EntityLocal entity) {
         super.setEntity(entity);
 
-        subscriptions().subscribe(entity, INVOKE_IMMEDIATELY, this);
-        subscriptions().subscribe(entity, INVOKE_AT, this);
+        subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, INVOKE_IMMEDIATELY, this);
+        subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, INVOKE_AT, this);
     }
 
     @Override
@@ -89,7 +90,7 @@
                 }
             }
             if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) {
-                Boolean invoke = (Boolean) event.getValue();
+                Boolean invoke = Boolean.TRUE.equals(event.getValue());
                 if (invoke) {
                     schedule(Duration.ZERO);
                 }
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/AbstractEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/AbstractEffectorPolicyTest.java
new file mode 100644
index 0000000..dd0fed3
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/AbstractEffectorPolicyTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.action;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.brooklyn.api.objs.Configurable;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class AbstractEffectorPolicyTest extends BrooklynAppUnitTestSupport {
+
+    protected static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start");
+
+    protected <T> void assertConfigEqualsEventually(Configurable obj, ConfigKey<T> running, T val) {
+        Asserts.eventually(() -> obj.config().get(running), Predicates.equalTo(val));
+    }
+
+    protected void assertCallHistoryNeverContinually(TestEntity entity, String effector) {
+        Asserts.continually(() -> entity.getCallHistory(), l -> !l.contains(effector));
+    }
+
+    protected void assertCallHistoryContainsEventually(TestEntity entity, String effector) {
+        assertCallHistoryEventually(entity, effector, 1);
+    }
+
+    protected void assertCallHistoryEventually(TestEntity entity, String effector, int minSize) {
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                int size = getCallHistoryCount(entity, effector);
+                assertTrue(size >= minSize, "size="+size);
+            }});
+    }
+    
+    protected void assertCallsStopEventually(TestEntity entity, String effector) {
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                int size1 = getCallHistoryCount(entity, effector);
+                Asserts.succeedsContinually(ImmutableMap.of("timeout", Duration.millis(100)), new Runnable() {
+                    public void run() {
+                        int size2 = getCallHistoryCount(entity, effector);
+                        assertEquals(size1, size2);
+                    }});
+            }});
+    }
+
+    protected int getCallHistoryCount(TestEntity entity, String effector) {
+        List<String> callHistory = entity.getCallHistory();
+        synchronized (callHistory) {
+            return Iterables.size(Iterables.filter(callHistory, Predicates.equalTo("myEffector")));
+        }
+    }
+}
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
index 0268d60..a2aa4a6 100644
--- a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
@@ -22,22 +22,17 @@
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.policy.Policy;
 import org.apache.brooklyn.api.policy.PolicySpec;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
-public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport {
-
-    private static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start");
+public class PeriodicEffectorPolicyTest extends AbstractEffectorPolicyTest {
 
     @Test
     public void testPeriodicEffectorFires() {
@@ -55,20 +50,19 @@
         Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
 
         entity.sensors().set(START, Boolean.TRUE);
-        Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
-        Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
-        int calls = entity.getCallHistory().size();
-        Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500));
+        assertConfigEqualsEventually(policy, PeriodicEffectorPolicy.RUNNING, true);
+        assertCallHistoryEventually(entity, "myEffector", 2);
     }
 
-    @Test
+    // Integration because of long wait
+    @Test(groups="Integration")
     public void testPeriodicEffectorFiresAfterDelay() {
         TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
                 .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
                         .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
                         .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
                         .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
-                        .configure(PeriodicEffectorPolicy.WAIT, Duration.TEN_SECONDS)
+                        .configure(PeriodicEffectorPolicy.WAIT, Duration.FIVE_SECONDS)
                         .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
         Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
         Asserts.assertNotNull(policy);
@@ -77,20 +71,53 @@
         Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
 
         entity.sensors().set(START, Boolean.TRUE);
-        Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
-        sleep(Duration.seconds(5));
-        Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
-        sleep(Duration.seconds(5));
-        Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
-        int calls = entity.getCallHistory().size();
-        Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500));
+        assertConfigEqualsEventually(policy, PeriodicEffectorPolicy.RUNNING, true);
+        assertCallHistoryNeverContinually(entity, "myEffector");
+        
+        Time.sleep(Duration.seconds(5));
+        assertCallHistoryEventually(entity, "myEffector", 2);
     }
 
-    private void sleep(Duration duration) {
-        try {
-            Thread.sleep(duration.toMilliseconds());
-        } catch (InterruptedException ie) {
-            Exceptions.propagate(ie);  
-        }
+    @Test
+    public void testSuspendsAndResumes() {
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
+                        .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
+                        .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+                        .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
+                        .configure(PeriodicEffectorPolicy.TIME, "immediately")
+                        .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
+        Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
+        Asserts.assertNotNull(policy);
+
+        entity.sensors().set(START, Boolean.TRUE);
+        assertCallHistoryContainsEventually(entity, "myEffector");
+        
+        policy.suspend();
+        assertCallsStopEventually(entity, "myEffector");
+        entity.clearCallHistory();
+        
+        policy.resume();
+        assertCallHistoryContainsEventually(entity, "myEffector");
+    }
+    
+    @Test
+    public void testSuspendsAndResumeBeforeTriggered() {
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
+                        .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
+                        .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+                        .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
+                        .configure(PeriodicEffectorPolicy.TIME, "immediately")
+                        .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
+        Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
+        Asserts.assertNotNull(policy);
+
+        policy.suspend();
+        policy.resume();
+        assertCallHistoryNeverContinually(entity, "myEffector");
+        
+        entity.sensors().set(START, Boolean.TRUE);
+        assertCallHistoryContainsEventually(entity, "myEffector");
     }
 }
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
index 5271de3..e6947b2 100644
--- a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
@@ -22,22 +22,17 @@
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.policy.Policy;
 import org.apache.brooklyn.api.policy.PolicySpec;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
-public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport {
-
-    private static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start");
+public class ScheduledEffectorPolicyTest extends AbstractEffectorPolicyTest {
 
     @Test
     public void testScheduledEffectorFiresImmediately() {
@@ -54,17 +49,18 @@
         Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
 
         entity.sensors().set(START, Boolean.TRUE);
-        Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
-        Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+        assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true);
+        assertCallHistoryContainsEventually(entity, "myEffector");
     }
 
-    @Test
+    // Integration because of long wait
+    @Test(groups="Integration")
     public void testScheduledEffectorFiresAfterDelay() {
         TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
                 .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
                         .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
                         .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
-                        .configure(ScheduledEffectorPolicy.WAIT, Duration.TEN_SECONDS)
+                        .configure(ScheduledEffectorPolicy.WAIT, Duration.FIVE_SECONDS)
                         .configure(ScheduledEffectorPolicy.START_SENSOR, START)));
         Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
         Asserts.assertNotNull(policy);
@@ -73,11 +69,33 @@
         Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
 
         entity.sensors().set(START, Boolean.TRUE);
-        Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
-        sleep(Duration.seconds(5));
-        Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
-        sleep(Duration.seconds(5));
-        Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+        assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true);
+        assertCallHistoryNeverContinually(entity, "myEffector");
+
+        Time.sleep(Duration.seconds(5));
+        assertCallHistoryContainsEventually(entity, "myEffector");
+    }
+
+    // Integration because of long wait
+    @Test(groups="Integration")
+    public void testSuspendsAndResumes() {
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
+                        .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
+                        .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+                        .configure(ScheduledEffectorPolicy.WAIT, Duration.FIVE_SECONDS)
+                        .configure(ScheduledEffectorPolicy.START_SENSOR, START)));
+        Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
+        Asserts.assertNotNull(policy);
+
+        entity.sensors().set(START, Boolean.TRUE);
+        assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true);
+        
+        policy.suspend();
+        policy.resume();
+
+        Time.sleep(Duration.seconds(5));
+        assertCallHistoryContainsEventually(entity, "myEffector");
     }
 
     @Test
@@ -94,19 +112,10 @@
         Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
 
         entity.sensors().set(START, Boolean.TRUE);
-        Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
-        sleep(Duration.seconds(5));
-        Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
+        assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true);
+        assertCallHistoryNeverContinually(entity, "myEffector");
 
         entity.sensors().set(ScheduledEffectorPolicy.INVOKE_IMMEDIATELY, Boolean.TRUE);
-        Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
-    }
-
-    private void sleep(Duration duration) {
-        try {
-            Thread.sleep(duration.toMilliseconds());
-        } catch (InterruptedException ie) {
-            Exceptions.propagate(ie);  
-        }
+        assertCallHistoryContainsEventually(entity, "myEffector");
     }
 }
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java
index 825ac31..2fdc80c 100644
--- a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java
@@ -18,12 +18,18 @@
  */
 package org.apache.brooklyn.policy.action;
 
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+
 import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.objs.Configurable;
 import org.apache.brooklyn.api.policy.Policy;
 import org.apache.brooklyn.api.policy.PolicySpec;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
 import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
-import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.test.Asserts;
@@ -53,20 +59,16 @@
                         .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
 
         origEntity.sensors().set(START, Boolean.TRUE);
-        Asserts.eventually(() -> origEntity.getCallHistory(), l -> l.contains("myEffector"));
+        assertCallHistoryContainsEventually(origEntity, "myEffector");
 
-        Policy origPolicy = Iterables.tryFind(origEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
-        Asserts.assertNotNull(origPolicy);
-        newApp = rebind();
-        ((AbstractPolicy) origPolicy).destroy();
-        TestEntity newEntity = (TestEntity) Iterables.tryFind(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)).orNull();
-        Asserts.assertNotNull(newEntity);
-        Policy newPolicy = Iterables.tryFind(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
-        Asserts.assertNotNull(newPolicy);
+        newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true));
 
-        Asserts.eventually(() -> newPolicy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
+        TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class));
+        Policy newPolicy = Iterables.find(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class));
+
+        assertConfigEqualsEventually(newPolicy, PeriodicEffectorPolicy.RUNNING, true);
         int calls = newEntity.getCallHistory().size();
-        Asserts.eventually(() -> newEntity.getCallHistory().size(), i -> i > (calls + 500));
+        assertCallHistoryEventually(newEntity, "myEffector", calls + 2);
     }
 
     @Test
@@ -75,25 +77,20 @@
                 .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
                         .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
                         .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
-                        .configure(PeriodicEffectorPolicy.PERIOD, Duration.seconds(1))
+                        .configure(PeriodicEffectorPolicy.PERIOD, Duration.millis(100))
                         .configure(PeriodicEffectorPolicy.TIME, "immediately")
                         .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
 
         origEntity.sensors().set(START, Boolean.TRUE);
-        Asserts.eventually(() -> origEntity.getCallHistory(), l -> l.contains("myEffector"));
+        assertCallHistoryContainsEventually(origEntity, "myEffector");
 
-        Policy origPolicy = Iterables.tryFind(origEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
-        Asserts.assertNotNull(origPolicy);
-        newApp = rebind();
-        ((AbstractPolicy) origPolicy).destroy();
-        TestEntity newEntity = (TestEntity) Iterables.tryFind(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)).orNull();
-        Asserts.assertNotNull(newEntity);
-        Policy newPolicy = Iterables.tryFind(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
-        Asserts.assertNotNull(newPolicy);
+        newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true));
 
-        Asserts.eventually(() -> newPolicy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
-        int calls = newEntity.getCallHistory().size();
-        Asserts.eventually(() -> newEntity.getCallHistory().size(), i -> i > (calls + 5));
+        TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class));
+        Policy newPolicy = Iterables.find(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class));
+
+        assertConfigEqualsEventually(newPolicy, PeriodicEffectorPolicy.RUNNING, true);
+        assertCallHistoryContainsEventually(newEntity, "myEffector");
     }
 
     @Test
@@ -106,23 +103,35 @@
                         .configure(PeriodicEffectorPolicy.TIME, "immediately")
                         .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
 
-        Policy origPolicy = Iterables.tryFind(origEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
-        Asserts.assertNotNull(origPolicy);
-        newApp = rebind();
-        ((AbstractPolicy) origPolicy).destroy();
-        TestEntity newEntity = (TestEntity) Iterables.tryFind(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)).orNull();
-        Asserts.assertNotNull(newEntity);
-        Policy newPolicy = Iterables.tryFind(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
-        Asserts.assertNotNull(newPolicy);
+        newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true));
+        
+        TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class));
+        Policy newPolicy = Iterables.find(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class));
 
         Asserts.assertFalse(newPolicy.config().get(PeriodicEffectorPolicy.RUNNING));
         Asserts.assertFalse(newEntity.getCallHistory().contains("myEffector"));
-        Asserts.assertFalse(origEntity.getCallHistory().contains("myEffector"));
 
         newEntity.sensors().set(START, Boolean.TRUE);
-        Asserts.eventually(() -> newPolicy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
-        Asserts.eventually(() -> newEntity.getCallHistory(), l -> l.contains("myEffector"));
-        int calls = newEntity.getCallHistory().size();
-        Asserts.eventually(() -> newEntity.getCallHistory().size(), i -> i > (calls + 500));
+        assertConfigEqualsEventually(newPolicy, PeriodicEffectorPolicy.RUNNING, true);
+        assertCallHistoryEventually(newEntity, "myEffector", 2);
+    }
+    
+    private <T> void assertConfigEqualsEventually(Configurable obj, ConfigKey<T> running, T val) {
+        Asserts.eventually(() -> obj.config().get(running), Predicates.equalTo(val));
+    }
+
+    private void assertCallHistoryContainsEventually(TestEntity entity, String effector) {
+        assertCallHistoryEventually(entity, effector, 1);
+    }
+
+    private void assertCallHistoryEventually(TestEntity entity, String effector, int minSize) {
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                List<String> callHistory = entity.getCallHistory();
+                synchronized (callHistory) {
+                    int size = Iterables.size(Iterables.filter(callHistory, Predicates.equalTo("myEffector")));
+                    assertTrue(size >= minSize, "size="+size);
+                }
+            }});
     }
 }