Add more dependent configuration wait modes
So on_fire does not break concurrently started items
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
index 99ac222..6cb5e5a 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
@@ -33,10 +33,12 @@
import org.apache.brooklyn.core.entity.EntityAsserts;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.server.BrooklynServerConfig;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.StringEscapes.JavaStringEscapes;
@@ -57,7 +59,7 @@
import static org.testng.Assert.*;
public class ConfigYamlTest extends AbstractYamlTest {
-
+
private static final Logger LOG = LoggerFactory.getLogger(ConfigYamlTest.class);
final static String DOUBLE_MAX_VALUE_TIMES_TEN = "" + Double.MAX_VALUE + "0";
@@ -68,7 +70,7 @@
@Override
public void setUp() throws Exception {
super.setUp();
-
+
executor = Executors.newCachedThreadPool();
}
@@ -106,18 +108,18 @@
assertNull(entity.getMyField()); // field with @SetFromFlag
assertNull(entity.getMyField2()); // field with @SetFromFlag("myField2Alias"), set using alias
}
-
+
@Test
public void testRecursiveConfigFailsGracefully() throws Exception {
doTestRecursiveConfigFailsGracefully(false);
}
-
+
@Test
public void testRecursiveConfigImmediateFailsGracefully() throws Exception {
doTestRecursiveConfigFailsGracefully(true);
}
-
+
protected void doTestRecursiveConfigFailsGracefully(boolean immediate) throws Exception {
String yaml = Joiner.on("\n").join(
"services:",
@@ -178,18 +180,18 @@
final Entity app = createStartWaitAndLogApplication(yaml);
TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
-
+
assertEquals(entity.config().get(TestEntity.CONF_NAME), "myName"); // confName has @SetFromFlag("confName"); using full name
assertEquals(entity.config().get(TestEntity.CONF_OBJECT), "myObj"); // confObject does not have @SetFromFlag
assertEquals(entity.config().get(TestEntity.CONF_STRING), "myString"); // set using the @SetFromFlag alias
-
- // The "dynamic" config key (i.e. not defined on the entity's type) is not picked up to
+
+ // The "dynamic" config key (i.e. not defined on the entity's type) is not picked up to
// be set on the entity if it's not inside the "brooklyn.config" block. This isn't exactly
// desired behaviour, but it is what happens! This test is more to demonstrate the behaviour
- // than to say it is definitely what we want! But like the comment at the start of the
+ // than to say it is definitely what we want! But like the comment at the start of the
// method says, this style is discouraged so we don't really care.
assertNull(entity.config().get(ConfigKeys.newStringConfigKey("test.confDynamic"))); // not defined on entity
-
+
// Again this isn't exactly desired behaviour, just a demonstration of what happens!
// The names used in YAML correspond to fields with @SetFromFlag. The values end up in the
// {@link EntitySpec#config} rather than {@link EntitySpec#flags}. The field is not set.
@@ -212,13 +214,13 @@
final Entity app = createStartWaitAndLogApplication(yaml);
TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
-
+
// Task that resolves quickly
assertEquals(entity.config().get(TestEntity.CONF_MAP_PLAIN), ImmutableMap.of("mykey", "myval"));
assertEquals(entity.config().get(TestEntity.CONF_LIST_PLAIN), ImmutableList.of("myval"));
assertEquals(entity.config().get(TestEntity.CONF_SET_PLAIN), ImmutableSet.of("myval"));
}
-
+
/**
* This tests config keys of type {@link org.apache.brooklyn.core.config.MapConfigKey}, etc.
* It sets the value all in one go (as opposed to explicit sub-keys).
@@ -244,7 +246,7 @@
final Entity app = createStartWaitAndLogApplication(yaml);
TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
-
+
// Task that resolves quickly
assertEquals(entity.config().get(TestEntity.CONF_MAP_THING), ImmutableMap.of("mykey", "myval"));
assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING), ImmutableMap.of("mykey", "myval"));
@@ -253,7 +255,7 @@
assertEquals(entity.config().get(TestEntity.CONF_SET_THING), ImmutableSet.of("myval"));
assertEquals(entity.config().get(TestEntity.CONF_SET_OBJ_THING), ImmutableSet.of("myval"));
}
-
+
/**
* This tests config keys of type {@link org.apache.brooklyn.core.config.MapConfigKey}, etc.
* It sets the value of each sub-key explicitly, rather than all in one go.
@@ -273,7 +275,7 @@
final Entity app = createStartWaitAndLogApplication(yaml);
TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
-
+
// Task that resolves quickly
assertEquals(entity.config().get(TestEntity.CONF_MAP_THING), ImmutableMap.of("mykey", "myval"));
assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING), ImmutableMap.of("mykey", "myval"));
@@ -282,7 +284,7 @@
assertEquals(entity.config().get(TestEntity.CONF_SET_THING), ImmutableSet.of("myval"));
assertEquals(entity.config().get(TestEntity.CONF_SET_OBJ_THING), ImmutableSet.of("myval"));
}
-
+
@Test
public void testDeferredSupplierToConfig() throws Exception {
String yaml = Joiner.on("\n").join(
@@ -307,7 +309,7 @@
final Entity app = createStartWaitAndLogApplication(yaml);
TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
-
+
assertEquals(entity.config().get(TestEntity.CONF_NAME), "myOther");
assertEquals(entity.config().get(TestEntity.CONF_MAP_THING), ImmutableMap.of("mykey", "myOther"));
assertEquals(entity.config().get(TestEntity.CONF_LIST_THING), ImmutableList.of("myOther"));
@@ -316,7 +318,7 @@
assertEquals(entity.config().get(TestEntity.CONF_LIST_PLAIN), ImmutableList.of("myOther"));
assertEquals(entity.config().get(TestEntity.CONF_SET_PLAIN), ImmutableSet.of("myOther"));
}
-
+
@Test
public void testDeferredSupplierToAttributeWhenReady() throws Exception {
String yaml = Joiner.on("\n").join(
@@ -342,7 +344,7 @@
// Non-blocking calls will now return with the value
assertEquals(entity.config().getNonBlocking(TestEntity.CONF_NAME).get(), "myOther");
}
-
+
/**
* This tests config keys of type {@link org.apache.brooklyn.core.config.MapConfigKey}, etc.
* For plain maps, see {@link #testDeferredSupplierToAttributeWhenReadyInPlainCollections()}
@@ -387,10 +389,10 @@
assertEquals(entity.config().getNonBlocking(TestEntity.CONF_LIST_THING).get(), ImmutableList.of("myOther"));
assertEquals(entity.config().getNonBlocking(TestEntity.CONF_SET_THING).get(), ImmutableSet.of("myOther"));
}
-
+
/**
* This tests config keys of type {@link java.util.Map}, etc.
- * For special types (e.g. {@link org.apache.brooklyn.core.config.MapConfigKey}), see
+ * For special types (e.g. {@link org.apache.brooklyn.core.config.MapConfigKey}), see
* {@link #testDeferredSupplierToAttributeWhenReadyInPlainCollections()}.
*/
@Test
@@ -453,8 +455,8 @@
@Test
public void testAttributeWhenReadyOptionsBasicOnOtherEntity() throws Exception {
- String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), { attributeWhenReady: [ \"test.name\", { timeout: 10ms } ] } ] }";
- String v1 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), { attributeWhenReady: [ \"test.name\", { \"timeout\": \"10ms\" } ] } ] }";
+ String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), { attributeWhenReady: [ \"test.name\", { timeout: 10ms, timeout_if_on_fire: 0 } ] } ] }";
+ String v1 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), { attributeWhenReady: [ \"test.name\", { \"timeout\": \"10ms\", \"timeout_if_on_fire\": 0 } ] } ] }";
String yaml = Joiner.on("\n").join(
"services:",
@@ -484,8 +486,9 @@
sw = Stopwatch.createStarted();
Asserts.assertFailsWith(() -> entity1.config().get(TestEntity.CONF_NAME),
Asserts.expectedFailureContainsIgnoreCase("Cannot resolve", "$brooklyn:chain", " attributeWhenReady", "test.name", "0", "Resolving config test.confName",
-// "Unsatisfied after ",
- "Abort due to", "on-fire"));
+// "Unsatisfied after "
+ "Abort due to"
+ , "on-fire"));
Asserts.assertThat(Duration.of(sw.elapsed()), d -> d.isShorterThan(Duration.millis(999)));
// and source code
@@ -571,8 +574,8 @@
@Test
public void testAttributeWhenReadyOptionsTimeoutIfDownResetsAndAbortsIfOnFire() throws Exception {
- // was 10ms, but that is too short as there are 10ms sleeps while stopping; 50ms is better
- String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), { attributeWhenReady: [ \"test.name\", { timeout: forever, timeout_if_down: 50ms } ] } ] }";
+ // was 10ms, but that is too short as there are 10ms sleeps while stopping
+ String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), { attributeWhenReady: [ \"test.name\", { timeout: forever, timeout_if_down: 250ms, timeout_if_on_fire: 0 } ] } ] }";
String yaml = Joiner.on("\n").join(
"services:",
@@ -589,8 +592,9 @@
entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING);
Stopwatch sw = Stopwatch.createStarted();
new Thread(()->{
- Time.sleep(Duration.millis(10));
+ Time.sleep(Duration.millis(10)); // 100 to force detection after start
entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING);
+// Time.sleep(Duration.millis(100)); // + additional delay to force timer running
entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); // will clear the timeout
Time.sleep(Duration.millis(10));
@@ -676,6 +680,12 @@
}
@Test
+ public void testAttributeWhenReadyOptionsCoercion() throws Exception {
+ DependentConfiguration.AttributeWhenReadyOptions o = TypeCoercions.coerce(DependentConfiguration.AttributeWhenReadyOptions.allowingOnFireMap(), DependentConfiguration.AttributeWhenReadyOptions.class);
+ Asserts.assertFalse(o.abort_if_on_fire);
+ }
+
+ @Test
public void testConfigGoodNumericCoercions() throws Exception {
String yaml = Joiner.on("\n").join(
"services:",
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
index bc669ee..57c7ce5 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
@@ -25,7 +25,6 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -38,9 +37,6 @@
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.api.sensor.SensorEvent;
-import org.apache.brooklyn.api.sensor.SensorEventListener;
-import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
@@ -72,12 +68,10 @@
import org.apache.brooklyn.util.guava.Maybe.Absent;
import org.apache.brooklyn.util.javalang.JavaClassNames;
import org.apache.brooklyn.util.net.Urls;
-import org.apache.brooklyn.util.text.StringFunctions;
import org.apache.brooklyn.util.text.StringFunctions.RegexReplacer;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.CountdownTimer;
import org.apache.brooklyn.util.time.Duration;
-import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,24 +157,38 @@
}
public static class AttributeWhenReadyOptions {
+ public Duration timeout;
+
@JsonAlias("timeoutIfDown")
public Duration timeout_if_down;
- public Duration timeout;
+ @JsonAlias("timeoutIfDownInitial")
+ public Duration timeout_if_down_initial;
+
+ @JsonAlias("timeoutIfOnFire")
+ public Duration timeout_if_on_fire;
+ @JsonAlias("timeoutIfOnFireInitial")
+ public Duration timeout_if_on_fire_initial;
@JsonAlias("abortIfOnFire")
public boolean abort_if_on_fire = true;
@JsonAlias("waitForTruthy")
public boolean wait_for_truthy = true;
+ // we might want an additional wait_on_timeout_conditions to prevent resolution if any of the timeout conditions are true;
+ // that could be used to prevent a value from an on-fire or down entity being used;
+ // not generally needed though as call pattern can be to wait on service lifecycle being explicitly RUNNING, eg via latches
+
public static AttributeWhenReadyOptions defaultOptions() {
AttributeWhenReadyOptions result = new AttributeWhenReadyOptions();
result.abort_if_on_fire = true;
+ result.timeout_if_on_fire = Duration.ZERO;
+ result.timeout_if_on_fire_initial = Duration.seconds(15); // plenty of time for concurrently started dependencies to transition to started
result.timeout_if_down = Duration.ONE_MINUTE;
return result;
}
public static Map allowingOnFireMap() {
- return MutableMap.of("timeout", "forever");
+ return MutableMap.of("timeout", "forever", "abort_if_on_fire", false);
}
}
@@ -227,7 +235,7 @@
*/
@Deprecated
public static <T,V> Task<V> attributePostProcessedWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready, final Closure<V> postProcess) {
- return attributePostProcessedWhenReady(source, sensor, ready, GroovyJavaMethods.<T,V>functionFromClosure(postProcess));
+ return attributePostProcessedWhenReady(source, sensor, ready, GroovyJavaMethods.functionFromClosure(postProcess));
}
@SuppressWarnings("unchecked")
@@ -245,19 +253,23 @@
return builder.build();
}
+ @Deprecated // since 1.1 use builder
public static <T> T waitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready) {
- return waitInTaskForAttributeReady(source, sensor, ready, ImmutableList.<AttributeAndSensorCondition<?>>of());
+ return waitInTaskForAttributeReady(source, sensor, ready, ImmutableList.of());
}
+ @Deprecated // since 1.1 use builder
public static <T> T waitInTaskForAttributeReady(final Entity source, final AttributeSensor<T> sensor, Predicate<? super T> ready, List<AttributeAndSensorCondition<?>> abortConditions) {
- String blockingDetails = "Waiting for ready from "+source+" "+sensor+" (subscription)";
- return waitInTaskForAttributeReady(source, sensor, ready, abortConditions, blockingDetails);
+ return (T) waitInTaskForAttributeReady(source, sensor, ready, (List) abortConditions, "Waiting for ready from "+source+" "+sensor+" (subscription)");
}
// TODO would be nice to have an easy semantics for whenServiceUp (cf DynamicWebAppClusterImpl.whenServiceUp)
- public static <T> T waitInTaskForAttributeReady(final Entity source, final AttributeSensor<T> sensor, Predicate<? super T> ready, List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) {
- return new WaitInTaskForAttributeReady<T,T>(source, sensor, ready, abortConditions, blockingDetails).call();
+ @Deprecated // since 1.1 use builder
+ public static <T> T waitInTaskForAttributeReady(final Entity source, final AttributeSensor<T> sensor, Predicate<? super T> ready, List<AttributeAndSensorCondition> abortConditions, String blockingDetails) {
+ Builder<T,T> b = builder().attributeWhenReadyNoOptions(source, sensor).readiness(ready).blockingDetails(blockingDetails).timeout(Duration.PRACTICALLY_FOREVER);
+ if (abortConditions!=null) abortConditions.forEach(c -> b.abortIf(c.source, c.sensor, c.predicate));
+ return new WaitInTaskForAttributeReady<>(b).call();
}
protected static class WaitInTaskForAttributeReady<T,V> implements Callable<V> {
@@ -271,8 +283,7 @@
protected final Entity source;
protected final AttributeSensor<T> sensor;
protected final Predicate<? super T> ready;
- protected final List<AttributeAndSensorCondition<?>> abortSensorConditions;
- protected List<Pair<AttributeAndSensorCondition<Object>,Duration>> timeoutIfTimeoutSensorConditions = null;
+ protected List<AttributeAndSensorConditionWithTimeouts<Object>> timeoutIfTimeoutSensorConditions;
protected final String blockingDetails;
protected final Function<? super T,? extends V> postProcess;
protected final Duration timeout;
@@ -285,7 +296,6 @@
this.source = builder.source;
this.sensor = builder.sensor;
this.ready = builder.readiness;
- this.abortSensorConditions = builder.abortSensorConditions;
this.timeoutIfTimeoutSensorConditions = builder.timeoutIfTimeoutSensorConditions;
this.blockingDetails = builder.blockingDetails;
this.postProcess = builder.postProcess;
@@ -295,22 +305,6 @@
this.onUnmanaged = builder.onUnmanaged;
}
- private WaitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready,
- List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) {
- this.source = source;
- this.sensor = sensor;
- this.ready = ready;
- this.abortSensorConditions = abortConditions;
- this.blockingDetails = blockingDetails;
-
- this.timeout = Duration.PRACTICALLY_FOREVER;
- this.timeoutIfTimeoutSensorConditions = null;
- this.onTimeout = Maybe.absent();
- this.ignoreUnmanaged = DEFAULT_IGNORE_UNMANAGED;
- this.onUnmanaged = Maybe.absent();
- this.postProcess = null;
- }
-
@SuppressWarnings("unchecked")
protected V postProcess(T value) {
if (this.postProcess!=null) return postProcess.apply(value);
@@ -338,17 +332,27 @@
throw new RuntimeTimeoutException("Waiting not permitted");
}
- final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
long start = System.currentTimeMillis();
- for (AttributeAndSensorCondition abortCondition : abortSensorConditions) {
- Object currentValue = abortCondition.source.getAttribute(abortCondition.sensor);
- if (abortCondition.predicate.apply(currentValue)) {
- abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+currentValue));
+ final List<Exception> abortImmediatelyExceptions = Lists.newCopyOnWriteArrayList();
+ Map<Integer,Duration> customTimeouts = MutableMap.of();
+ if (timeoutIfTimeoutSensorConditions!=null) {
+ for (int i=0; i<timeoutIfTimeoutSensorConditions.size(); i++) {
+ AttributeAndSensorConditionWithTimeouts<Object> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(i);
+ if (timeoutIfCondition.timeoutInitial!=null) {
+ Object currentValue = timeoutIfCondition.source.getAttribute(timeoutIfCondition.sensor);
+ if (timeoutIfCondition.predicate.apply(currentValue)) {
+ if (Duration.ZERO.equals(timeoutIfCondition.timeoutInitial)) {
+ abortImmediatelyExceptions.add(new Exception("Abort due to " + timeoutIfCondition + ": " + currentValue));
+ } else {
+ customTimeouts.put(i, timeoutIfCondition.timeoutInitial);
+ }
+ }
+ }
}
}
- if (!abortionExceptions.isEmpty()) {
- throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
+ if (!abortImmediatelyExceptions.isEmpty()) {
+ throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortImmediatelyExceptions);
}
TaskInternal<?> current = (TaskInternal<?>) Tasks.current();
@@ -357,83 +361,77 @@
if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+
current+" has no entity tag ("+current.getStatusDetail(false)+")");
- final LinkedList<T> publishedValues = new LinkedList<T>();
+ final LinkedList<T> publishedValues = new LinkedList<>();
final Semaphore semaphore = new Semaphore(0); // could use Exchanger
SubscriptionHandle subscription = null;
List<SubscriptionHandle> thisWaitSubscriptions = Lists.newArrayList();
try {
- subscription = entity.subscriptions().subscribe(source, sensor, new SensorEventListener<T>() {
- @Override public void onEvent(SensorEvent<T> event) {
- synchronized (publishedValues) { publishedValues.add(event.getValue()); }
- semaphore.release();
- }});
+ subscription = entity.subscriptions().subscribe(source, sensor, event -> {
+ synchronized (publishedValues) { publishedValues.add(event.getValue()); }
+ semaphore.release();
+ });
- for (final AttributeAndSensorCondition abortCondition : abortSensorConditions) {
- thisWaitSubscriptions.add(entity.subscriptions().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener<Object>() {
- @Override public void onEvent(SensorEvent<Object> event) {
- if (abortCondition.predicate.apply(event.getValue())) {
- abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+event.getValue()));
- semaphore.release();
- }
- }}));
- Object currentValue = abortCondition.source.getAttribute(abortCondition.sensor);
- if (abortCondition.predicate.apply(currentValue)) {
- abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+currentValue));
- }
- }
- if (!abortionExceptions.isEmpty()) {
- throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
- }
+ final CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : Duration.PRACTICALLY_FOREVER.countdownTimer();
- CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : Duration.PRACTICALLY_FOREVER.countdownTimer();
-
- Map<Integer,Duration> customTimeouts = MutableMap.of();
BiConsumer<Integer,Object> checkValueAtIndex = (index, val) -> {
synchronized (customTimeouts) {
- Pair<AttributeAndSensorCondition<Object>, Duration> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
- if (timeoutIfCondition.getLeft().predicate.apply(val)) {
- if (!customTimeouts.containsKey(index)) {
- // start timer from this point
- customTimeouts.put(index, timer.getDurationElapsed().add(timeoutIfCondition.getRight()));
+ AttributeAndSensorConditionWithTimeouts<Object> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
+ if (timeoutIfCondition.predicate.apply(val)) {
+ if (timeoutIfCondition.timeout!=null) {
+ if (!customTimeouts.containsKey(index)) {
+ // start timer from this point
+ Duration customTimeout = Duration.ZERO.equals(timeoutIfCondition.timeout) ? Duration.ZERO : timer.getDurationElapsed().add(timeoutIfCondition.timeout);
+ if (timeoutIfCondition.timeoutInitial!=null) customTimeout = Duration.max(timeoutIfCondition.timeoutInitial, customTimeout);
+ if (Duration.ZERO.equals(customTimeout)) {
+ abortImmediatelyExceptions.add(new Exception("Abort due to " + timeoutIfCondition + ": " + val));
+ } else {
+ customTimeouts.put(index, customTimeout);
+ }
+ }
+ } else {
+ // if timeout not set, it is only enabled for 'initial'; don't do anything (only remove if condition becomes false)
}
} else {
customTimeouts.remove(index);
}
}
};
-
if (timeoutIfTimeoutSensorConditions!=null) {
for (int i=0; i<timeoutIfTimeoutSensorConditions.size(); i++) {
int index = i;
- Pair<AttributeAndSensorCondition<Object>, Duration> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
+ AttributeAndSensorConditionWithTimeouts<Object> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
- thisWaitSubscriptions.add(entity.subscriptions().subscribe(timeoutIfCondition.getLeft().source, timeoutIfCondition.getLeft().sensor, new SensorEventListener<Object>() {
- @Override public void onEvent(SensorEvent<Object> event) {
- checkValueAtIndex.accept(index, event.getValue());
- }}));
+ thisWaitSubscriptions.add(entity.subscriptions().subscribe(timeoutIfCondition.source, timeoutIfCondition.sensor, event -> {
+ checkValueAtIndex.accept(index, event.getValue());
+ semaphore.release(); // indicate that timeouts need to be checked again
+ }));
- Object val = timeoutIfCondition.getLeft().source.getAttribute(timeoutIfCondition.getLeft().sensor);
+ Object val = timeoutIfCondition.source.getAttribute(timeoutIfCondition.sensor);
checkValueAtIndex.accept(index, val);
}
+ if (!abortImmediatelyExceptions.isEmpty()) {
+ throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortImmediatelyExceptions);
+ }
}
Duration maxPeriod = ValueResolver.PRETTY_QUICK_WAIT;
Duration nextPeriod = ValueResolver.REAL_QUICK_PERIOD;
- while (true) {
+ outer: while (true) {
// check the source on initial run (could be done outside the loop)
// and also (optionally) on each iteration in case it is more recent
value = source.getAttribute(sensor);
if (ready(value)) break;
- if (timer!=null) {
- if (timer.getDurationRemaining().isShorterThan(nextPeriod)) {
- nextPeriod = timer.getDurationRemaining();
+ if (timer.getDurationRemaining().isShorterThan(nextPeriod)) {
+ nextPeriod = timer.getDurationRemaining();
+ }
+ if (timer.isExpired()) {
+ if (!abortImmediatelyExceptions.isEmpty()) {
+ throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortImmediatelyExceptions);
}
- if (timer.isExpired()) {
- if (onTimeout.isPresent()) return onTimeout.get();
- throw new RuntimeTimeoutException("Unsatisfied after "+Duration.sinceUtc(start));
- }
+ if (onTimeout.isPresent()) return onTimeout.get();
+ throw new RuntimeTimeoutException("Unsatisfied after "+Duration.sinceUtc(start));
}
String prevBlockingDetails = current.setBlockingDetails(blockingDetails);
@@ -454,7 +452,13 @@
if (publishedValues.isEmpty()) break;
value = publishedValues.pop();
}
- if (ready(value)) break;
+ if (ready(value)) break outer;
+ if (!abortImmediatelyExceptions.isEmpty()) {
+ throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortImmediatelyExceptions);
+ }
+ }
+ if (!abortImmediatelyExceptions.isEmpty()) {
+ throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortImmediatelyExceptions);
}
// if unmanaged then ignore the other abort conditions
@@ -463,10 +467,6 @@
throw new NotManagedException(entity);
}
- if (!abortionExceptions.isEmpty()) {
- throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
- }
-
Set<Map.Entry<Integer, Duration>> timeoutsHere = null;
synchronized (customTimeouts) {
if (!customTimeouts.isEmpty()) {
@@ -477,13 +477,16 @@
for (Map.Entry<Integer, Duration> entry : timeoutsHere) {
Integer index = entry.getKey();
Duration specialTimeout = entry.getValue();
- Pair<AttributeAndSensorCondition<Object>, Duration> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
+ AttributeAndSensorConditionWithTimeouts<Object> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
if (timer.getDurationElapsed().isLongerThan(specialTimeout)) {
- Object val = timeoutIfCondition.getLeft().source.getAttribute(timeoutIfCondition.getLeft().sensor);
- if (timeoutIfCondition.getLeft().predicate.apply(val)) {
+ Object val = timeoutIfCondition.source.getAttribute(timeoutIfCondition.sensor);
+ if (timeoutIfCondition.predicate.apply(val)) {
+ if (!abortImmediatelyExceptions.isEmpty()) {
+ throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortImmediatelyExceptions);
+ }
if (onTimeout.isPresent()) continue;
throw new RuntimeTimeoutException("Unsatisfied after " + Duration.sinceUtc(start) + " (tighter timeout due to " +
- timeoutIfCondition.getLeft() + ", with value " + val + ")");
+ timeoutIfCondition + ", with value " + val + ")");
}
}
}
@@ -513,14 +516,14 @@
*/
@Deprecated
public static <T> Task<T> whenDone(Callable<T> job) {
- return new BasicTask<T>(MutableMap.of("tag", "whenDone", "displayName", "waiting for job"), job);
+ return new BasicTask<>(MutableMap.of("tag", "whenDone", "displayName", "waiting for job"), job);
}
/**
* Returns a {@link Task} which waits for the result of first parameter, then applies the function in the second
* parameter to it, returning that result.
- *
- * Particular useful in Entity configuration where config will block until Tasks have completed,
+ * <p>
+ * Particularly useful in Entity configuration where config will block until Tasks have completed,
* allowing for example an {@link #attributeWhenReady(Entity, AttributeSensor, Predicate)} expression to be
* passed in the first argument then transformed by the function in the second argument to generate
* the value that is used for the configuration
@@ -545,14 +548,12 @@
*/
@SuppressWarnings({ "rawtypes" })
public static <U,T> Task<T> transform(final Map flags, final TaskAdaptable<U> task, final Function<U,T> transformer) {
- return new BasicTask<T>(flags, new Callable<T>() {
- @Override
- public T call() throws Exception {
- if (!task.asTask().isSubmitted()) {
- BasicExecutionContext.getCurrentExecutionContext().submit(task);
- }
- return transformer.apply(task.asTask().get());
- }});
+ return new BasicTask<>(flags, () -> {
+ if (!task.asTask().isSubmitted()) {
+ BasicExecutionContext.getCurrentExecutionContext().submit(task);
+ }
+ return transformer.apply(task.asTask().get());
+ });
}
/** Returns a task which waits for multiple other tasks (submitting if necessary)
@@ -600,7 +601,7 @@
}
});
}
- return transform(flags, new ParallelTask<U>(tasks), transformer);
+ return transform(flags, new ParallelTask<>(tasks), transformer);
}
@@ -629,9 +630,8 @@
}
return transformMultiple(
- MutableMap.<String,String>of("displayName", "formatting '"+spec.toString()+"' with "+taskArgs.size()+" task"+(taskArgs.size()!=1?"s":"")),
- new Function<List<Object>, String>() {
- @Override public String apply(List<Object> input) {
+ MutableMap.of("displayName", "formatting '"+spec.toString()+"' with "+taskArgs.size()+" task"+(taskArgs.size()!=1?"s":"")),
+ input -> {
Iterator<?> tri = input.iterator();
Object[] vv = new Object[newArgs.length];
int i=0;
@@ -642,7 +642,7 @@
i++;
}
return String.format(vv[0].toString(), Arrays.copyOfRange(vv, 1, vv.length));
- }},
+ },
taskArgs);
}
@@ -659,7 +659,7 @@
List<Object> resolvedArgs = Lists.newArrayList();
for (Object arg : args) {
Maybe<?> argVal = resolveImmediately(arg);
- if (argVal.isAbsent()) return Maybe.Absent.castAbsent(argVal);
+ if (argVal.isAbsent()) return Absent.castAbsent(argVal);
resolvedArgs.add(argVal.get());
}
@@ -672,7 +672,7 @@
public static Maybe<String> urlEncodeImmediately(Object arg) {
Maybe<?> resolvedArg = resolveImmediately(arg);
if (resolvedArg.isAbsent()) return Absent.castAbsent(resolvedArg);
- if (resolvedArg.isNull()) return Maybe.<String>of((String)null);
+ if (resolvedArg.isNull()) return Maybe.of((String)null);
String resolvedString = resolvedArg.get().toString();
return Maybe.of(Urls.encode(resolvedString));
@@ -690,13 +690,13 @@
else if (arg instanceof TaskFactory) taskArgs.add( ((TaskFactory<TaskAdaptable<Object>>)arg).newTask() );
return transformMultiple(
- MutableMap.<String,String>of("displayName", "url-escaping '"+arg),
+ MutableMap.of("displayName", "url-escaping '"+arg),
new Function<List<Object>, String>() {
@Override
@Nullable
public String apply(@Nullable List<Object> input) {
Object resolvedArg;
- if (arg instanceof TaskAdaptable || arg instanceof TaskFactory) resolvedArg = Iterables.getOnlyElement(input);
+ if (input != null && (arg instanceof TaskAdaptable || arg instanceof TaskFactory)) resolvedArg = Iterables.getOnlyElement(input);
else if (arg instanceof DeferredSupplier) resolvedArg = ((DeferredSupplier<?>) arg).get();
else resolvedArg = arg;
@@ -710,17 +710,14 @@
public static Task<Object> external(ManagementContext mgmt, final Object provider, final Object key) {
List<TaskAdaptable<Object>> argsNeedingAdaptation = getTaskAdaptable(provider, key);
- return Tasks.<Object>builder()
+ return Tasks.builder()
.displayName("resolving external configuration: '" + key + "' from provider '" + provider + "'")
.dynamic(false)
- .body(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- Iterator<TaskAdaptable<Object>> ai = argsNeedingAdaptation.iterator();
- return ((ManagementContextInternal)mgmt).getExternalConfigProviderRegistry().getConfig(
- resolveArgument(provider, ai),
- resolveArgument(key, ai));
- }
+ .body(() -> {
+ Iterator<TaskAdaptable<Object>> ai = argsNeedingAdaptation.iterator();
+ return ((ManagementContextInternal)mgmt).getExternalConfigProviderRegistry().getConfig(
+ resolveArgument(provider, ai),
+ resolveArgument(key, ai));
})
.build();
}
@@ -761,7 +758,7 @@
if (resolvedReplacement.isAbsent()) return Absent.castAbsent(resolvedReplacement);
String resolvedReplacementStr = String.valueOf(resolvedReplacement.get());
- String result = new StringFunctions.RegexReplacer(resolvedPatternStr, resolvedReplacementStr).apply(resolvedSourceStr);
+ String result = new RegexReplacer(resolvedPatternStr, resolvedReplacementStr).apply(resolvedSourceStr);
return Maybe.of(result);
}
@@ -784,8 +781,8 @@
if (resolvedReplacement.isAbsent()) return Absent.castAbsent(resolvedReplacement);
String resolvedReplacementStr = String.valueOf(resolvedReplacement.get());
- RegexReplacer result = new StringFunctions.RegexReplacer(resolvedPatternStr, resolvedReplacementStr);
- return Maybe.<Function<String, String>>of(result);
+ RegexReplacer result = new RegexReplacer(resolvedPatternStr, resolvedReplacementStr);
+ return Maybe.of(result);
}
public static Task<Function<String, String>> regexReplacement(Object pattern, Object replacement) {
@@ -804,7 +801,7 @@
Integer resolvedMaxThreadsInt = TypeCoercions.coerce(resolvedMaxThreads, Integer.class);
ReleaseableLatch result = ReleaseableLatch.Factory.newMaxConcurrencyLatch(resolvedMaxThreadsInt);
- return Maybe.<ReleaseableLatch>of(result);
+ return Maybe.of(result);
}
public static Task<ReleaseableLatch> maxConcurrency(Object maxThreads) {
@@ -845,11 +842,12 @@
@Nullable
@Override
public String apply(@Nullable List<Object> input) {
+ if (input==null) return null;
Iterator<?> taskArgsIterator = input.iterator();
String resolvedSource = resolveArgument(source, taskArgsIterator);
String resolvedPattern = resolveArgument(pattern, taskArgsIterator);
String resolvedReplacement = resolveArgument(replacement, taskArgsIterator);
- return new StringFunctions.RegexReplacer(resolvedPattern, resolvedReplacement).apply(resolvedSource);
+ return new RegexReplacer(resolvedPattern, resolvedReplacement).apply(resolvedSource);
}
}
@@ -866,8 +864,9 @@
@Override
public Function<String, String> apply(List<Object> input) {
+ if (input==null) return null;
Iterator<?> taskArgsIterator = input.iterator();
- return new StringFunctions.RegexReplacer(resolveArgument(pattern, taskArgsIterator), resolveArgument(replacement, taskArgsIterator));
+ return new RegexReplacer(resolveArgument(pattern, taskArgsIterator), resolveArgument(replacement, taskArgsIterator));
}
}
@@ -881,6 +880,7 @@
@Override
public ReleaseableLatch apply(List<Object> input) {
+ if (input==null) return null;
Iterator<?> taskArgsIterator = input.iterator();
Integer maxThreadsNum = resolveArgument(maxThreads, taskArgsIterator, Integer.class);
return ReleaseableLatch.Factory.newMaxConcurrencyLatch(maxThreadsNum);
@@ -897,7 +897,7 @@
/**
* Resolves the argument as follows:
- *
+ * <p>
* If the argument is a DeferredSupplier, we will block and wait for it to resolve. If the argument is TaskAdaptable or TaskFactory,
* we will assume that the resolved task has been queued on the {@code taskArgsIterator}, otherwise the argument has already been resolved.
*
@@ -927,7 +927,7 @@
*/
@Deprecated
public static <T> Task<List<T>> listAttributesWhenReady(AttributeSensor<T> sensor, Iterable<Entity> entities, Closure<Boolean> readiness) {
- Predicate<Object> readinessPredicate = (readiness != null) ? GroovyJavaMethods.<Object>predicateFromClosure(readiness) : JavaGroovyEquivalents.groovyTruthPredicate();
+ Predicate<Object> readinessPredicate = (readiness != null) ? GroovyJavaMethods.predicateFromClosure(readiness) : JavaGroovyEquivalents.groovyTruthPredicate();
return listAttributesWhenReady(sensor, entities, readinessPredicate);
}
@@ -951,7 +951,7 @@
try {
return (T) Tasks.resolveValue(t, Object.class, ((EntityInternal)context).getExecutionContext(), contextMessage);
} catch (ExecutionException e) {
- throw Throwables.propagate(e);
+ throw Exceptions.propagate(e);
}
}
@@ -972,6 +972,20 @@
}
}
+ public static class AttributeAndSensorConditionWithTimeouts<T> extends AttributeAndSensorCondition<T> {
+ /** timeout used once subscription is established, subject to any timeoutInitial; if unset the condition is only checked at start */
+ protected final Duration timeout;
+ /** timeout used if condition is true prior to subscription being established, and also used as a minimum period from subscription start
+ * to which any {@link #timeout} is extended; eg if this is 5m but timeout is 1m, a failure at the 3m mark will wait 2m;
+ * after 4m all failures will wait 1m. */
+ protected final Duration timeoutInitial;
+ public AttributeAndSensorConditionWithTimeouts(Entity source, AttributeSensor<T> sensor, Predicate<? super T> predicate, Duration timeout, Duration timeoutInitial) {
+ super(source, sensor, predicate);
+ this.timeout = timeout;
+ this.timeoutInitial = timeoutInitial;
+ }
+ }
+
public static ProtoBuilder builder() {
return new ProtoBuilder();
}
@@ -1004,14 +1018,14 @@
* Will wait for the attribute on the given entity, not aborting when it goes {@link Lifecycle#ON_FIRE}, no timeout.
*/
public <T2> Builder<T2,T2> attributeWhenReadyNoOptions(Entity source, AttributeSensor<T2> sensor) {
- return new Builder<T2,T2>(source, sensor);
+ return new Builder<>(source, sensor);
}
/**
* Alias for {@link #attributeWhenReadyNoOptions(Entity, AttributeSensor)}
*/
public <T2> Builder<T2,T2> attributeWhenReadyAllowingOnFire(Entity source, AttributeSensor<T2> sensor) {
- return new Builder<T2,T2>(source, sensor);
+ return new Builder<>(source, sensor);
}
/** Constructs a builder for task for parallel execution returning a list of values of the given sensor list on the given entity,
@@ -1023,7 +1037,7 @@
/** As {@link #attributeWhenReadyFromMultiple(Iterable, AttributeSensor)} with an explicit readiness test. */
@Beta
public <T> MultiBuilder<T, T, List<T>> attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) {
- return new MultiBuilder<T, T, List<T>>(sources, sensor, readiness);
+ return new MultiBuilder<>(sources, sensor, readiness);
}
}
@@ -1035,8 +1049,7 @@
protected AttributeSensor<T> sensor;
protected Predicate<? super T> readiness;
protected Function<? super T, ? extends V> postProcess;
- protected List<AttributeAndSensorCondition<?>> abortSensorConditions = Lists.newArrayList();
- protected List<Pair<AttributeAndSensorCondition<Object>,Duration>> timeoutIfTimeoutSensorConditions = null;
+ protected List<AttributeAndSensorConditionWithTimeouts<Object>> timeoutIfTimeoutSensorConditions = null;
protected String blockingDetails;
protected Duration timeout;
protected Maybe<V> onTimeout = Maybe.absent();
@@ -1078,8 +1091,7 @@
return abortIf(source, sensor, JavaGroovyEquivalents.groovyTruthPredicate());
}
public <T2> Builder<T,V> abortIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate) {
- abortSensorConditions.add(new AttributeAndSensorCondition<T2>(source, sensor, predicate));
- return this;
+ return timeoutIf(source, sensor, predicate, Duration.ZERO, Duration.ZERO);
}
/** Causes the depender to abort immediately if {@link Attributes#SERVICE_STATE_ACTUAL}
* is {@link Lifecycle#ON_FIRE}. */
@@ -1087,10 +1099,19 @@
abortIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.ON_FIRE));
return this;
}
+ public Builder<T,V> timeoutIfOnFire(Duration time, Duration timeInitial) {
+ if (time==null && timeInitial==null) return this;
+ timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.ON_FIRE), time, timeInitial);
+ return this;
+ }
/** Causes the depender to timeout after the given time if {@link Attributes#SERVICE_STATE_ACTUAL}
* is not starting or running */
public Builder<T,V> timeoutIfDown(Duration time) {
- timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.in(MutableList.of(Lifecycle.STOPPING, Lifecycle.STOPPED, Lifecycle.DESTROYED, Lifecycle.ON_FIRE, Lifecycle.CREATED, null)), time);
+ return timeoutIfDown(time, null);
+ }
+ public Builder<T,V> timeoutIfDown(Duration time, Duration timeInitial) {
+ if (time==null && timeInitial==null) return this;
+ timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.in(MutableList.of(Lifecycle.STOPPING, Lifecycle.STOPPED, Lifecycle.DESTROYED, Lifecycle.ON_FIRE, Lifecycle.CREATED, null)), time, timeInitial);
return this;
}
@@ -1101,8 +1122,17 @@
public Builder<T,V> options(AttributeWhenReadyOptions options) {
if (options!=null) {
if (options.timeout!=null) timeout(options.timeout);
- if (options.timeout_if_down != null) timeoutIfDown(options.timeout_if_down);
- if (Boolean.TRUE.equals(options.abort_if_on_fire)) abortIfOnFire();
+ timeoutIfDown(options.timeout_if_down, options.timeout_if_down_initial);
+
+ if (options.timeout_if_on_fire==null && options.timeout_if_on_fire_initial==null) {
+ if (Boolean.TRUE.equals(options.abort_if_on_fire)) {
+ AttributeWhenReadyOptions defaultOptions = AttributeWhenReadyOptions.defaultOptions();
+ timeoutIfOnFire(defaultOptions.timeout_if_on_fire, defaultOptions.timeout_if_on_fire_initial);
+ }
+ // otherwise nothing
+ } else {
+ timeoutIfOnFire(options.timeout_if_on_fire, options.timeout_if_on_fire_initial);
+ }
if (!options.wait_for_truthy) {
readiness = Predicates.alwaysTrue();
@@ -1121,9 +1151,12 @@
return this;
}
/** specifies the supplied timeout if the condition is met */
- public <T2> Builder<T,V> timeoutIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate, Duration val) {
+ public <T2> Builder<T,V> timeoutIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate, Duration timeout) {
+ return timeoutIf(source, sensor, predicate, timeout, timeout);
+ }
+ public <T2> Builder<T,V> timeoutIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate, Duration timeout, Duration timeoutInitial) {
if (timeoutIfTimeoutSensorConditions==null) timeoutIfTimeoutSensorConditions = MutableList.of();
- timeoutIfTimeoutSensorConditions.add(Pair.of(new AttributeAndSensorCondition(source, sensor, predicate), val));
+ timeoutIfTimeoutSensorConditions.add(new AttributeAndSensorConditionWithTimeouts(source, sensor, predicate, timeout, timeoutInitial));
return this;
}
public Builder<T,V> onTimeoutReturn(V val) {
@@ -1131,7 +1164,7 @@
return this;
}
public Builder<T,V> onTimeoutThrow() {
- onTimeout = Maybe.<V>absent();
+ onTimeout = Maybe.absent();
return this;
}
public Builder<T,V> onUnmanagedReturn(V val) {
@@ -1139,7 +1172,7 @@
return this;
}
public Builder<T,V> onUnmanagedThrow() {
- onUnmanaged = Maybe.<V>absent();
+ onUnmanaged = Maybe.absent();
return this;
}
/** @since 0.7.0 included in case old behaviour of not checking whether the entity is managed is required
@@ -1169,13 +1202,13 @@
.description("Waiting on sensor "+sensor.getName()+" from "+source)
.tag("attributeWhenReady")
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
- .body(new WaitInTaskForAttributeReady<T,V>(this))
+ .body(new WaitInTaskForAttributeReady<>(this))
.build();
}
public V runNow() {
validate();
- return new WaitInTaskForAttributeReady<T,V>(this).call();
+ return new WaitInTaskForAttributeReady<>(this).call();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void validate() {
@@ -1208,11 +1241,11 @@
}
@Beta
protected MultiBuilder(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) {
- builder = new Builder<T,V>(null, sensor);
+ builder = new Builder<>(null, sensor);
builder.readiness(readiness);
for (Entity s : checkNotNull(sources, "sources")) {
- multiSource.add(new AttributeAndSensorCondition<T>(s, sensor, readiness));
+ multiSource.add(new AttributeAndSensorCondition<>(s, sensor, readiness));
}
this.name = "waiting on "+sensor.getName();
this.descriptionBase = "waiting on "+sensor.getName()+" "+readiness
@@ -1300,14 +1333,12 @@
} else {
return Tasks.<V2>builder().displayName(name).description(descriptionBase)
.tag("attributeWhenReady")
- .body(new Callable<V2>() {
- @Override public V2 call() throws Exception {
- List<V> prePostProgress = DynamicTasks.queue(parallelTask).get();
- return DynamicTasks.queue(
- Tasks.<V2>builder().displayName("post-processing").description("Applying "+postProcessFromMultiple)
- .body(Functionals.callable(postProcessFromMultiple, prePostProgress))
- .build()).get();
- }
+ .body(() -> {
+ List<V> prePostProgress = DynamicTasks.queue(parallelTask).get();
+ return DynamicTasks.queue(
+ Tasks.<V2>builder().displayName("post-processing").description("Applying "+postProcessFromMultiple)
+ .body(Functionals.callable(postProcessFromMultiple, prePostProgress))
+ .build()).get();
})
.build();
}
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/DependentConfigurationTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/DependentConfigurationTest.java
index ce61252..5903976 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/DependentConfigurationTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/DependentConfigurationTest.java
@@ -29,6 +29,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.base.Stopwatch;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.AttributeSensor;
@@ -67,10 +68,10 @@
public class DependentConfigurationTest extends BrooklynAppUnitTestSupport {
private static final Logger log = LoggerFactory.getLogger(DependentConfigurationTest.class);
-
+
public static final int SHORT_WAIT_MS = 100;
public static final int TIMEOUT_MS = 30*1000;
-
+
private TestEntity entity;
private TestEntity entity2;
@@ -81,11 +82,11 @@
entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
entity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class));
}
-
+
@Test
public void testTransform() throws Exception {
Task<Integer> t = DependentConfiguration.transform(
- new BasicTask<Integer>(Callables.returning(2)),
+ new BasicTask<Integer>(Callables.returning(2)),
incrementerFunction());
submit(t);
assertEquals(t.get(TIMEOUT_MS, TimeUnit.MILLISECONDS), Integer.valueOf(3));
@@ -97,12 +98,12 @@
return val + 1;
}};
}
-
+
@Test
public void testFormatString() throws Exception {
Task<String> t = DependentConfiguration.formatString("%s://%s:%d/",
"http",
- new BasicTask<String>(Callables.returning("localhost")),
+ new BasicTask<String>(Callables.returning("localhost")),
DependentConfiguration.transform(new BasicTask<Integer>(Callables.returning(8080)), incrementerFunction()));
submit(t);
Assert.assertEquals(t.get(TIMEOUT_MS, TimeUnit.MILLISECONDS), "http://localhost:8081/");
@@ -157,7 +158,7 @@
public void testAttributeWhenReady() throws Exception {
final Task<String> t = submit(DependentConfiguration.attributeWhenReady(entity, TestEntity.NAME));
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.NAME, "myval");
assertEquals(assertDoneEventually(t), "myval");
}
@@ -165,10 +166,10 @@
@Test
public void testAttributeWhenReadyWithPredicate() throws Exception {
final Task<String> t = submit(DependentConfiguration.attributeWhenReady(entity, TestEntity.NAME, Predicates.equalTo("myval2")));
-
+
entity.sensors().set(TestEntity.NAME, "myval");
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.NAME, "myval2");
assertEquals(assertDoneEventually(t), "myval2");
}
@@ -177,7 +178,7 @@
public void testAttributeWhenReadyWithPostProcessing() throws Exception {
final Task<String> t = submit(DependentConfiguration.valueWhenAttributeReady(entity, TestEntity.SEQUENCE, Functions.toStringFunction()));
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.SEQUENCE, 1);
assertEquals(assertDoneEventually(t), "1");
}
@@ -190,7 +191,7 @@
.build());
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.SEQUENCE, 1);
assertEquals(assertDoneEventually(t), "1");
}
@@ -207,7 +208,7 @@
}});
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.SEQUENCE, 1);
assertEquals(assertDoneEventually(t), "1");
}
@@ -219,7 +220,7 @@
.abortIf(entity2, TestEntity.SEQUENCE, Predicates.equalTo(1))
.build());
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.NAME, "myval");
assertEquals(assertDoneEventually(t), "myval");
}
@@ -320,41 +321,41 @@
}
@Test
- public void testAttributeWhenReadyAbortsWhenOnFireByDefault() {
+ public void testAttributeWhenReadyAbortsWhenOnFireImmediately() {
log.info("starting test "+JavaClassNames.niceClassAndMethod());
final Task<String> t = submit(DependentConfiguration.builder()
- .attributeWhenReady(entity, TestEntity.NAME)
+ .attributeWhenReady(entity, TestEntity.NAME).abortIfOnFire()
.build());
ServiceStateLogic.setExpectedState(entity, Lifecycle.ON_FIRE);
EntityAsserts.assertAttributeEqualsEventually(entity, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
-
+
try {
assertDoneEventually(t);
fail("Should have failed already!");
} catch (Throwable e) {
- if (e.toString().contains("Aborted waiting for ready"))
- return;
-
+ if (e.toString().contains("Aborted waiting for ready")) return;
+// if (e.toString().contains("Unsatisfied after")) return;
+
log.warn("Did not abort as expected: "+e, e);
Dumper.dumpInfo(entity);
-
+
throw Exceptions.propagate(e);
}
}
@Test(invocationCount=100, groups = "Integration")
public void testAttributeWhenReadyAbortsWhenOnfireByDefaultManyTimes() {
- testAttributeWhenReadyAbortsWhenOnFireByDefault();
+ testAttributeWhenReadyAbortsWhenOnFireImmediately();
}
-
+
@Test
- public void testAttributeWhenReadyAbortsWhenAlreadyOnFireByDefault() throws Exception {
+ public void testAttributeWhenReadyAbortsWhenAlreadyOnFireImmediately() throws Exception {
ServiceStateLogic.setExpectedState(entity, Lifecycle.ON_FIRE);
EntityAsserts.assertAttributeEqualsEventually(entity, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
-
+
final Task<String> t = submit(DependentConfiguration.builder()
- .attributeWhenReady(entity, TestEntity.NAME)
+ .attributeWhenReady(entity, TestEntity.NAME).abortIfOnFire()
.build());
try {
@@ -366,15 +367,35 @@
}
@Test
+ public void testAttributeWhenReadyAbortsWhenAlreadyOnFireAfterMillis() throws Exception {
+ ServiceStateLogic.setExpectedState(entity, Lifecycle.ON_FIRE);
+ EntityAsserts.assertAttributeEqualsEventually(entity, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+
+ Stopwatch timer = Stopwatch.createStarted();
+ final Task<String> t = submit(DependentConfiguration.builder()
+ .attributeWhenReady(entity, TestEntity.NAME).timeoutIfOnFire(Duration.ZERO, Duration.millis(100))
+ .build());
+
+ try {
+ assertDoneEventually(t);
+ fail();
+ } catch (Exception e) {
+ if (e.toString().contains("Aborted waiting for ready")) return;
+// if (e.toString().contains("Unsatisfied after")) return;
+ Asserts.assertThat(timer, tt -> Duration.of(tt).isLongerThan(Duration.millis(99)));
+ }
+ }
+
+ @Test
public void testListAttributeWhenReadyFromMultipleEntities() throws Exception {
final Task<List<String>> t = submit(DependentConfiguration.builder()
.attributeWhenReadyFromMultiple(ImmutableList.of(entity, entity2), TestEntity.NAME)
.build());
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.NAME, "myval");
assertNotDoneContinually(t);
-
+
entity2.sensors().set(TestEntity.NAME, "myval2");
assertEquals(ImmutableSet.copyOf(assertDoneEventually(t)), ImmutableSet.of("myval", "myval2"));
}
@@ -384,11 +405,11 @@
final Task<List<String>> t = submit(DependentConfiguration.builder()
.attributeWhenReadyFromMultiple(ImmutableList.of(entity, entity2), TestEntity.NAME, StringPredicates.startsWith("myval"))
.build());
-
+
entity.sensors().set(TestEntity.NAME, "wrongval");
entity2.sensors().set(TestEntity.NAME, "wrongval2");
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.NAME, "myval");
assertNotDoneContinually(t);
entity2.sensors().set(TestEntity.NAME, "myval2");
@@ -410,7 +431,7 @@
}
}})
.build());
-
+
entity.sensors().set(TestEntity.SEQUENCE, 1);
entity2.sensors().set(TestEntity.SEQUENCE, 2);
assertEquals(assertDoneEventually(t), "1,2");
@@ -426,7 +447,7 @@
}
});
}
-
+
private <T> T assertDoneEventually(final Task<T> t) throws Exception {
final AtomicReference<ExecutionException> exception = new AtomicReference<ExecutionException>();
T result = Asserts.succeedsEventually(MutableMap.of("timeout", Duration.FIVE_SECONDS), new Callable<T>() {
@@ -449,11 +470,11 @@
return result;
}
-
+
private <T> Task<T> submit(Task<T> task) {
return app.getExecutionContext().submit(task);
}
-
+
private <T> Task<T> submit(Callable<T> job) {
return app.getExecutionContext().submit(new BasicTask<T>(job));
}