Fix PeriodicEffectorPolicy
diff --git a/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java b/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java
index a2a64b9..065b432 100644
--- a/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java
+++ b/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java
@@ -219,7 +219,7 @@
@Override
public void clearCallHistory() {
- callHistory.clear();;
+ callHistory.clear();
}
public static class TestEntityWithoutEnrichers extends TestEntityImpl {
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
index f4635bc..b61e012 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
@@ -40,6 +40,7 @@
import org.apache.brooklyn.core.entity.EntityInitializers;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.policy.AbstractPolicy;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.config.ResolvingConfigBag;
import org.apache.brooklyn.util.core.task.Tasks;
@@ -53,9 +54,9 @@
import com.google.common.annotations.Beta;
import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
@Beta
@@ -101,7 +102,7 @@
public static final ConfigKey<Boolean> RUNNING = ConfigKeys.builder(Boolean.class)
.name("running")
- .description("Set if the executor has started")
+ .description("[INTERNAL] Set if the executor has started")
.defaultValue(Boolean.FALSE)
.reconfigurable(true)
.build();
@@ -109,7 +110,7 @@
public static final ConfigKey<List<Long>> SCHEDULED = ConfigKeys.builder(new TypeToken<List<Long>>() { })
.name("scheduled")
.description("List of all scheduled execution start times")
- .defaultValue(Lists.newCopyOnWriteArrayList())
+ .defaultValue(ImmutableList.of())
.reconfigurable(true)
.build();
@@ -126,6 +127,12 @@
setup();
}
+ @Override
+ public void rebind() {
+ // Called before setEntity; therefore don't do any real work here that might cause us to reference the entity
+ setup();
+ }
+
public void setup() {
if (executor != null) {
executor.shutdownNow();
@@ -140,29 +147,39 @@
effector = getEffector();
+ if (Boolean.TRUE.equals(config().get(RUNNING))) {
+ running.set(true);
+ resubmitOnResume();
+ }
+
AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
- subscriptions().subscribe(entity, sensor, this);
+ subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, sensor, this);
}
@Override
- public void rebind() {
- setup();
-
- if (config().get(RUNNING)) {
- running.set(true);
-
- List<Long> scheduled = config().get(SCHEDULED);
- for (Long when : scheduled) {
- Duration wait = Duration.millis(when - System.currentTimeMillis());
- if (wait.isPositive()) {
- schedule(wait);
- } else {
- scheduled.remove(when);
- }
- }
+ public void resume() {
+ super.resume();
+
+ if (running.get()) {
+ resubmitOnResume();
}
}
-
+
+ protected List<Long> resubmitOnResume() {
+ List<Long> scheduled = config().get(SCHEDULED);
+ List<Long> updatedScheduled = MutableList.copyOf(scheduled);
+ for (Long when : scheduled) {
+ Duration wait = Duration.millis(when - System.currentTimeMillis());
+ if (wait.isPositive()) {
+ scheduleInExecutor(wait);
+ } else {
+ updatedScheduled.remove(when);
+ }
+ }
+ config().set(SCHEDULED, updatedScheduled);
+ return updatedScheduled;
+ }
+
@Override
protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) {
if (key.isReconfigurable()) {
@@ -178,13 +195,14 @@
super.destroy();
}
+
public abstract void start();
protected Effector<?> getEffector() {
String effectorName = config().get(EFFECTOR);
Maybe<Effector<?>> effector = getEntity().getEntityType().getEffectorByName(effectorName);
if (effector.isAbsentOrNull()) {
- throw new IllegalStateException("Cannot find effector " + effectorName);
+ throw new IllegalStateException("Cannot find effector " + effectorName + " on entity " + getEntity());
}
return effector.get();
}
@@ -196,8 +214,7 @@
try {
Calendar now = Calendar.getInstance();
Calendar when = Calendar.getInstance();
- boolean formatted = time.contains(":"); // FIXME deprecated TimeDuration coercion
- Date parsed = formatted ? FORMATTER.parse(time) : new Date(Long.parseLong(time) * 1000);
+ Date parsed = parseTime(time);
when.setTime(parsed);
when.set(now.get(Calendar.YEAR), now.get(Calendar.MONTH), now.get(Calendar.DATE));
if (when.before(now)) {
@@ -210,16 +227,35 @@
}
}
+ protected Date parseTime(String time) throws ParseException {
+ boolean formatted = time.contains(":"); // FIXME deprecated TimeDuration coercion
+ if (formatted) {
+ synchronized (FORMATTER) {
+ // DateFormat is not thread-safe; docs say to use one-per-thread, or to synchronize externally
+ return FORMATTER.parse(time);
+ }
+ } else {
+ return new Date(Long.parseLong(time) * 1000);
+ }
+ }
+
protected void schedule(Duration wait) {
- List<Long> scheduled = config().get(SCHEDULED);
+ List<Long> scheduled = MutableList.copyOf(config().get(SCHEDULED));
scheduled.add(System.currentTimeMillis() + wait.toMilliseconds());
+ config().set(SCHEDULED, scheduled);
+ scheduleInExecutor(wait);
+ }
+
+ private void scheduleInExecutor(Duration wait) {
executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS);
}
@Override
public synchronized void run() {
if (effector == null) return;
+ if (!(isRunning() && getManagementContext().isRunning())) return;
+
try {
ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag());
Map<String, Object> args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS);
@@ -233,8 +269,8 @@
Object result = entity.invoke(effector, resolved).getUnchecked();
LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result });
} catch (RuntimeInterruptedException rie) {
- Thread.interrupted();
- // TODO sometimes this seems to hang the executor?
+ // Gracefully stop
+ Thread.currentThread().interrupt();
} catch (Throwable t) {
LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() });
Exceptions.propagate(t);
@@ -246,7 +282,7 @@
LOG.debug("{}: Got event {}", this, event);
AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
if (event.getSensor().getName().equals(sensor.getName())) {
- Boolean start = (Boolean) event.getValue();
+ Boolean start = Boolean.TRUE.equals(event.getValue());
if (start && running.compareAndSet(false, true)) {
config().set(RUNNING, true);
start();
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
index 0c2de9f..dafc69c 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
@@ -86,28 +86,30 @@
}
@Override
- public void rebind() {
- super.rebind();
-
- // Check if we missed an entire period
- List<Long> scheduled = config().get(SCHEDULED);
- if (running.get() && scheduled.isEmpty()) {
+ protected List<Long> resubmitOnResume() {
+ List<Long> scheduled = super.resubmitOnResume();
+
+ if (scheduled.isEmpty()) {
+ // We missed an entire period; re-calculate (rather than relying on run's finally block)
start();
}
+ return scheduled;
}
-
+
@Override
public synchronized void run() {
try {
super.run();
} finally {
- Duration period = config().get(PERIOD);
- String time = config().get(TIME);
- if (time == null || time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) {
- schedule(period);
- } else {
- Duration wait = getWaitUntil(time);
- schedule(wait.upperBound(period));
+ if (isRunning() && getManagementContext().isRunning()) {
+ Duration period = config().get(PERIOD);
+ String time = config().get(TIME);
+ if (time == null || time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) {
+ schedule(period);
+ } else {
+ Duration wait = getWaitUntil(time);
+ schedule(wait.upperBound(period));
+ }
}
}
}
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
index 5a3bed7..a2a5818 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
@@ -31,6 +31,7 @@
import org.slf4j.LoggerFactory;
import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableMap;
/**
* A {@link Policy} the executes an {@link Effector} at a specific time in the future.
@@ -59,8 +60,8 @@
public void setEntity(final EntityLocal entity) {
super.setEntity(entity);
- subscriptions().subscribe(entity, INVOKE_IMMEDIATELY, this);
- subscriptions().subscribe(entity, INVOKE_AT, this);
+ subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, INVOKE_IMMEDIATELY, this);
+ subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, INVOKE_AT, this);
}
@Override
@@ -89,7 +90,7 @@
}
}
if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) {
- Boolean invoke = (Boolean) event.getValue();
+ Boolean invoke = Boolean.TRUE.equals(event.getValue());
if (invoke) {
schedule(Duration.ZERO);
}
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/AbstractEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/AbstractEffectorPolicyTest.java
new file mode 100644
index 0000000..dd0fed3
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/AbstractEffectorPolicyTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.action;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.brooklyn.api.objs.Configurable;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class AbstractEffectorPolicyTest extends BrooklynAppUnitTestSupport {
+
+ protected static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start");
+
+ protected <T> void assertConfigEqualsEventually(Configurable obj, ConfigKey<T> running, T val) {
+ Asserts.eventually(() -> obj.config().get(running), Predicates.equalTo(val));
+ }
+
+ protected void assertCallHistoryNeverContinually(TestEntity entity, String effector) {
+ Asserts.continually(() -> entity.getCallHistory(), l -> !l.contains(effector));
+ }
+
+ protected void assertCallHistoryContainsEventually(TestEntity entity, String effector) {
+ assertCallHistoryEventually(entity, effector, 1);
+ }
+
+ protected void assertCallHistoryEventually(TestEntity entity, String effector, int minSize) {
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ int size = getCallHistoryCount(entity, effector);
+ assertTrue(size >= minSize, "size="+size);
+ }});
+ }
+
+ protected void assertCallsStopEventually(TestEntity entity, String effector) {
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ int size1 = getCallHistoryCount(entity, effector);
+ Asserts.succeedsContinually(ImmutableMap.of("timeout", Duration.millis(100)), new Runnable() {
+ public void run() {
+ int size2 = getCallHistoryCount(entity, effector);
+ assertEquals(size1, size2);
+ }});
+ }});
+ }
+
+ protected int getCallHistoryCount(TestEntity entity, String effector) {
+ List<String> callHistory = entity.getCallHistory();
+ synchronized (callHistory) {
+ return Iterables.size(Iterables.filter(callHistory, Predicates.equalTo("myEffector")));
+ }
+ }
+}
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
index 0268d60..a2aa4a6 100644
--- a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
@@ -22,22 +22,17 @@
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.policy.Policy;
import org.apache.brooklyn.api.policy.PolicySpec;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
import org.testng.annotations.Test;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport {
-
- private static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start");
+public class PeriodicEffectorPolicyTest extends AbstractEffectorPolicyTest {
@Test
public void testPeriodicEffectorFires() {
@@ -55,20 +50,19 @@
Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
entity.sensors().set(START, Boolean.TRUE);
- Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
- Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
- int calls = entity.getCallHistory().size();
- Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500));
+ assertConfigEqualsEventually(policy, PeriodicEffectorPolicy.RUNNING, true);
+ assertCallHistoryEventually(entity, "myEffector", 2);
}
- @Test
+ // Integration because of long wait
+ @Test(groups="Integration")
public void testPeriodicEffectorFiresAfterDelay() {
TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
.policy(PolicySpec.create(PeriodicEffectorPolicy.class)
.configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
.configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
.configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
- .configure(PeriodicEffectorPolicy.WAIT, Duration.TEN_SECONDS)
+ .configure(PeriodicEffectorPolicy.WAIT, Duration.FIVE_SECONDS)
.configure(PeriodicEffectorPolicy.START_SENSOR, START)));
Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
Asserts.assertNotNull(policy);
@@ -77,20 +71,53 @@
Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
entity.sensors().set(START, Boolean.TRUE);
- Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
- sleep(Duration.seconds(5));
- Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
- sleep(Duration.seconds(5));
- Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
- int calls = entity.getCallHistory().size();
- Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500));
+ assertConfigEqualsEventually(policy, PeriodicEffectorPolicy.RUNNING, true);
+ assertCallHistoryNeverContinually(entity, "myEffector");
+
+ Time.sleep(Duration.seconds(5));
+ assertCallHistoryEventually(entity, "myEffector", 2);
}
- private void sleep(Duration duration) {
- try {
- Thread.sleep(duration.toMilliseconds());
- } catch (InterruptedException ie) {
- Exceptions.propagate(ie);
- }
+ @Test
+ public void testSuspendsAndResumes() {
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
+ .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
+ .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+ .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
+ .configure(PeriodicEffectorPolicy.TIME, "immediately")
+ .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
+ Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
+ Asserts.assertNotNull(policy);
+
+ entity.sensors().set(START, Boolean.TRUE);
+ assertCallHistoryContainsEventually(entity, "myEffector");
+
+ policy.suspend();
+ assertCallsStopEventually(entity, "myEffector");
+ entity.clearCallHistory();
+
+ policy.resume();
+ assertCallHistoryContainsEventually(entity, "myEffector");
+ }
+
+ @Test
+ public void testSuspendsAndResumeBeforeTriggered() {
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
+ .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
+ .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+ .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
+ .configure(PeriodicEffectorPolicy.TIME, "immediately")
+ .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
+ Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
+ Asserts.assertNotNull(policy);
+
+ policy.suspend();
+ policy.resume();
+ assertCallHistoryNeverContinually(entity, "myEffector");
+
+ entity.sensors().set(START, Boolean.TRUE);
+ assertCallHistoryContainsEventually(entity, "myEffector");
}
}
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
index 5271de3..e6947b2 100644
--- a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
@@ -22,22 +22,17 @@
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.policy.Policy;
import org.apache.brooklyn.api.policy.PolicySpec;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
import org.testng.annotations.Test;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport {
-
- private static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start");
+public class ScheduledEffectorPolicyTest extends AbstractEffectorPolicyTest {
@Test
public void testScheduledEffectorFiresImmediately() {
@@ -54,17 +49,18 @@
Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
entity.sensors().set(START, Boolean.TRUE);
- Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
- Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+ assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true);
+ assertCallHistoryContainsEventually(entity, "myEffector");
}
- @Test
+ // Integration because of long wait
+ @Test(groups="Integration")
public void testScheduledEffectorFiresAfterDelay() {
TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
.policy(PolicySpec.create(ScheduledEffectorPolicy.class)
.configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
.configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
- .configure(ScheduledEffectorPolicy.WAIT, Duration.TEN_SECONDS)
+ .configure(ScheduledEffectorPolicy.WAIT, Duration.FIVE_SECONDS)
.configure(ScheduledEffectorPolicy.START_SENSOR, START)));
Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
Asserts.assertNotNull(policy);
@@ -73,11 +69,33 @@
Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
entity.sensors().set(START, Boolean.TRUE);
- Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
- sleep(Duration.seconds(5));
- Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
- sleep(Duration.seconds(5));
- Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+ assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true);
+ assertCallHistoryNeverContinually(entity, "myEffector");
+
+ Time.sleep(Duration.seconds(5));
+ assertCallHistoryContainsEventually(entity, "myEffector");
+ }
+
+ // Integration because of long wait
+ @Test(groups="Integration")
+ public void testSuspendsAndResumes() {
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
+ .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
+ .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+ .configure(ScheduledEffectorPolicy.WAIT, Duration.FIVE_SECONDS)
+ .configure(ScheduledEffectorPolicy.START_SENSOR, START)));
+ Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
+ Asserts.assertNotNull(policy);
+
+ entity.sensors().set(START, Boolean.TRUE);
+ assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true);
+
+ policy.suspend();
+ policy.resume();
+
+ Time.sleep(Duration.seconds(5));
+ assertCallHistoryContainsEventually(entity, "myEffector");
}
@Test
@@ -94,19 +112,10 @@
Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
entity.sensors().set(START, Boolean.TRUE);
- Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
- sleep(Duration.seconds(5));
- Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
+ assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true);
+ assertCallHistoryNeverContinually(entity, "myEffector");
entity.sensors().set(ScheduledEffectorPolicy.INVOKE_IMMEDIATELY, Boolean.TRUE);
- Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
- }
-
- private void sleep(Duration duration) {
- try {
- Thread.sleep(duration.toMilliseconds());
- } catch (InterruptedException ie) {
- Exceptions.propagate(ie);
- }
+ assertCallHistoryContainsEventually(entity, "myEffector");
}
}
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java
index 825ac31..2fdc80c 100644
--- a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java
@@ -18,12 +18,18 @@
*/
package org.apache.brooklyn.policy.action;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+
import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.objs.Configurable;
import org.apache.brooklyn.api.policy.Policy;
import org.apache.brooklyn.api.policy.PolicySpec;
import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
-import org.apache.brooklyn.core.policy.AbstractPolicy;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
@@ -53,20 +59,16 @@
.configure(PeriodicEffectorPolicy.START_SENSOR, START)));
origEntity.sensors().set(START, Boolean.TRUE);
- Asserts.eventually(() -> origEntity.getCallHistory(), l -> l.contains("myEffector"));
+ assertCallHistoryContainsEventually(origEntity, "myEffector");
- Policy origPolicy = Iterables.tryFind(origEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
- Asserts.assertNotNull(origPolicy);
- newApp = rebind();
- ((AbstractPolicy) origPolicy).destroy();
- TestEntity newEntity = (TestEntity) Iterables.tryFind(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)).orNull();
- Asserts.assertNotNull(newEntity);
- Policy newPolicy = Iterables.tryFind(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
- Asserts.assertNotNull(newPolicy);
+ newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true));
- Asserts.eventually(() -> newPolicy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
+ TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class));
+ Policy newPolicy = Iterables.find(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class));
+
+ assertConfigEqualsEventually(newPolicy, PeriodicEffectorPolicy.RUNNING, true);
int calls = newEntity.getCallHistory().size();
- Asserts.eventually(() -> newEntity.getCallHistory().size(), i -> i > (calls + 500));
+ assertCallHistoryEventually(newEntity, "myEffector", calls + 2);
}
@Test
@@ -75,25 +77,20 @@
.policy(PolicySpec.create(PeriodicEffectorPolicy.class)
.configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
.configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
- .configure(PeriodicEffectorPolicy.PERIOD, Duration.seconds(1))
+ .configure(PeriodicEffectorPolicy.PERIOD, Duration.millis(100))
.configure(PeriodicEffectorPolicy.TIME, "immediately")
.configure(PeriodicEffectorPolicy.START_SENSOR, START)));
origEntity.sensors().set(START, Boolean.TRUE);
- Asserts.eventually(() -> origEntity.getCallHistory(), l -> l.contains("myEffector"));
+ assertCallHistoryContainsEventually(origEntity, "myEffector");
- Policy origPolicy = Iterables.tryFind(origEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
- Asserts.assertNotNull(origPolicy);
- newApp = rebind();
- ((AbstractPolicy) origPolicy).destroy();
- TestEntity newEntity = (TestEntity) Iterables.tryFind(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)).orNull();
- Asserts.assertNotNull(newEntity);
- Policy newPolicy = Iterables.tryFind(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
- Asserts.assertNotNull(newPolicy);
+ newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true));
- Asserts.eventually(() -> newPolicy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
- int calls = newEntity.getCallHistory().size();
- Asserts.eventually(() -> newEntity.getCallHistory().size(), i -> i > (calls + 5));
+ TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class));
+ Policy newPolicy = Iterables.find(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class));
+
+ assertConfigEqualsEventually(newPolicy, PeriodicEffectorPolicy.RUNNING, true);
+ assertCallHistoryContainsEventually(newEntity, "myEffector");
}
@Test
@@ -106,23 +103,35 @@
.configure(PeriodicEffectorPolicy.TIME, "immediately")
.configure(PeriodicEffectorPolicy.START_SENSOR, START)));
- Policy origPolicy = Iterables.tryFind(origEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
- Asserts.assertNotNull(origPolicy);
- newApp = rebind();
- ((AbstractPolicy) origPolicy).destroy();
- TestEntity newEntity = (TestEntity) Iterables.tryFind(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)).orNull();
- Asserts.assertNotNull(newEntity);
- Policy newPolicy = Iterables.tryFind(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
- Asserts.assertNotNull(newPolicy);
+ newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true));
+
+ TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class));
+ Policy newPolicy = Iterables.find(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class));
Asserts.assertFalse(newPolicy.config().get(PeriodicEffectorPolicy.RUNNING));
Asserts.assertFalse(newEntity.getCallHistory().contains("myEffector"));
- Asserts.assertFalse(origEntity.getCallHistory().contains("myEffector"));
newEntity.sensors().set(START, Boolean.TRUE);
- Asserts.eventually(() -> newPolicy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
- Asserts.eventually(() -> newEntity.getCallHistory(), l -> l.contains("myEffector"));
- int calls = newEntity.getCallHistory().size();
- Asserts.eventually(() -> newEntity.getCallHistory().size(), i -> i > (calls + 500));
+ assertConfigEqualsEventually(newPolicy, PeriodicEffectorPolicy.RUNNING, true);
+ assertCallHistoryEventually(newEntity, "myEffector", 2);
+ }
+
+ private <T> void assertConfigEqualsEventually(Configurable obj, ConfigKey<T> running, T val) {
+ Asserts.eventually(() -> obj.config().get(running), Predicates.equalTo(val));
+ }
+
+ private void assertCallHistoryContainsEventually(TestEntity entity, String effector) {
+ assertCallHistoryEventually(entity, effector, 1);
+ }
+
+ private void assertCallHistoryEventually(TestEntity entity, String effector, int minSize) {
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ List<String> callHistory = entity.getCallHistory();
+ synchronized (callHistory) {
+ int size = Iterables.size(Iterables.filter(callHistory, Predicates.equalTo("myEffector")));
+ assertTrue(size >= minSize, "size="+size);
+ }
+ }});
}
}