blob: eb4ee249fae0b8e6d8c6e7974bde287079ebf97e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.sensor;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.util.JavaGroovyEquivalents;
import org.apache.brooklyn.util.collections.CollectionFunctionals;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.core.task.BasicExecutionContext;
import org.apache.brooklyn.util.core.task.BasicTask;
import org.apache.brooklyn.util.core.task.DeferredSupplier;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.ImmediateSupplier;
import org.apache.brooklyn.util.core.task.ParallelTask;
import org.apache.brooklyn.util.core.task.TaskInternal;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.core.task.ValueResolver;
import org.apache.brooklyn.util.exceptions.CompoundRuntimeException;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.NotManagedException;
import org.apache.brooklyn.util.exceptions.RuntimeTimeoutException;
import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
import org.apache.brooklyn.util.guava.Functionals;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.guava.Maybe.Absent;
import org.apache.brooklyn.util.javalang.JavaClassNames;
import org.apache.brooklyn.util.net.Urls;
import org.apache.brooklyn.util.text.StringFunctions;
import org.apache.brooklyn.util.text.StringFunctions.RegexReplacer;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.CountdownTimer;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.Beta;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import groovy.lang.Closure;
/** Conveniences for making tasks which run in entity {@link ExecutionContext}s, blocking on attributes from other entities, possibly transforming those;
* these {@link Task} instances are typically passed in {@link Entity#setConfig(ConfigKey, Object)}.
* <p>
* If using a lot it may be useful to:
* <pre>
* {@code
* import static org.apache.brooklyn.core.sensor.DependentConfiguration.*;
* }
* </pre>
* <p>
* Note that these methods return one-time tasks. The DslComponent methods return long-lasting pointers
* and should now normally be used instead.
*/
public class DependentConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(DependentConfiguration.class);
//not instantiable, only a static helper
private DependentConfiguration() {}
/**
* Default readiness is Groovy truth, with timeout.
* @see #attributeWhenReady(Entity, AttributeSensor, Predicate)
*/
public static <T> Task<T> attributeWhenReady(Entity source, AttributeSensor<T> sensor) {
return attributeWhenReady(source, sensor, JavaGroovyEquivalents.groovyTruthPredicate());
}
/**
* @see #attributeWhenReadyAllowingOnFire(Entity, AttributeSensor, Predicate)
*/
public static <T> Task<T> attributeWhenReadyAllowingOnFire(Entity source, AttributeSensor<T> sensor) {
return attributeWhenReadyAllowingOnFire(source, sensor, JavaGroovyEquivalents.groovyTruthPredicate());
}
/**
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
public static <T> Task<T> attributeWhenReady(Entity source, AttributeSensor<T> sensor, Closure<Boolean> ready) {
Predicate<Object> readyPredicate = (ready != null) ? GroovyJavaMethods.<Object>predicateFromClosure(ready) : JavaGroovyEquivalents.groovyTruthPredicate();
return attributeWhenReady(source, sensor, readyPredicate);
}
/** returns an unsubmitted {@link Task} which blocks until the given sensor on the given source entity gives a value that satisfies ready, then returns that value;
* particular useful in Entity configuration where config will block until Tasks have a value.
* This task will fail if the entity goes on fire and timeout after 1 minute if starting or stopping,
* so is suitable for use in contexts where there is not a straightforward way to bail out in the case of those events
* (such as resolving template files as part of an upload).
* If this is not desired see {@link #attributeWhenReadyAllowingOnFire(Entity, AttributeSensor, Predicate)}.
*/
public static <T> Task<T> attributeWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready) {
Builder<T, T> builder = builder().attributeWhenReady(source, sensor);
if (ready != null) builder.readiness(ready);
return builder.build();
}
/** as {@link #attributeWhenReady(Entity, AttributeSensor, Predicate)} but with no timeout and not aborting if the entity goes on fire. */
public static <T> Task<T> attributeWhenReadyAllowingOnFire(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready) {
Builder<T, T> builder = builder().attributeWhenReadyAllowingOnFire(source, sensor);
if (ready != null) builder.readiness(ready);
return builder.build();
}
/**
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
public static <T,V> Task<V> attributePostProcessedWhenReady(Entity source, AttributeSensor<T> sensor, Closure<Boolean> ready, Closure<V> postProcess) {
Predicate<? super T> readyPredicate = (ready != null) ? GroovyJavaMethods.predicateFromClosure(ready) : JavaGroovyEquivalents.groovyTruthPredicate();
Function<? super T, V> postProcessFunction = GroovyJavaMethods.<T,V>functionFromClosure(postProcess);
return attributePostProcessedWhenReady(source, sensor, readyPredicate, postProcessFunction);
}
/**
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
public static <T,V> Task<V> attributePostProcessedWhenReady(Entity source, AttributeSensor<T> sensor, Closure<V> postProcess) {
return attributePostProcessedWhenReady(source, sensor, JavaGroovyEquivalents.groovyTruthPredicate(), GroovyJavaMethods.<T,V>functionFromClosure(postProcess));
}
public static <T> Task<T> valueWhenAttributeReady(Entity source, AttributeSensor<T> sensor, T value) {
return DependentConfiguration.<T,T>attributePostProcessedWhenReady(source, sensor, JavaGroovyEquivalents.groovyTruthPredicate(), Functions.constant(value));
}
public static <T,V> Task<V> valueWhenAttributeReady(Entity source, AttributeSensor<T> sensor, Function<? super T,V> valueProvider) {
return attributePostProcessedWhenReady(source, sensor, JavaGroovyEquivalents.groovyTruthPredicate(), valueProvider);
}
/**
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
public static <T,V> Task<V> valueWhenAttributeReady(Entity source, AttributeSensor<T> sensor, Closure<V> valueProvider) {
return attributePostProcessedWhenReady(source, sensor, JavaGroovyEquivalents.groovyTruthPredicate(), valueProvider);
}
/**
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
public static <T,V> Task<V> attributePostProcessedWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready, final Closure<V> postProcess) {
return attributePostProcessedWhenReady(source, sensor, ready, GroovyJavaMethods.<T,V>functionFromClosure(postProcess));
}
@SuppressWarnings("unchecked")
public static <T,V> Task<V> attributePostProcessedWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready, final Function<? super T,V> postProcess) {
Builder<T,T> builder1 = DependentConfiguration.builder().attributeWhenReady(source, sensor);
// messy generics here to support null postProcess; would be nice to disallow that here
Builder<T,V> builder;
if (postProcess != null) {
builder = builder1.postProcess(postProcess);
} else {
builder = (Builder<T,V>)builder1;
}
if (ready != null) builder.readiness(ready);
return builder.build();
}
public static <T> T waitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready) {
return waitInTaskForAttributeReady(source, sensor, ready, ImmutableList.<AttributeAndSensorCondition<?>>of());
}
public static <T> T waitInTaskForAttributeReady(final Entity source, final AttributeSensor<T> sensor, Predicate<? super T> ready, List<AttributeAndSensorCondition<?>> abortConditions) {
String blockingDetails = "Waiting for ready from "+source+" "+sensor+" (subscription)";
return waitInTaskForAttributeReady(source, sensor, ready, abortConditions, blockingDetails);
}
// TODO would be nice to have an easy semantics for whenServiceUp (cf DynamicWebAppClusterImpl.whenServiceUp)
public static <T> T waitInTaskForAttributeReady(final Entity source, final AttributeSensor<T> sensor, Predicate<? super T> ready, List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) {
return new WaitInTaskForAttributeReady<T,T>(source, sensor, ready, abortConditions, blockingDetails).call();
}
protected static class WaitInTaskForAttributeReady<T,V> implements Callable<V> {
/* This is a change since before Oct 2014. Previously it would continue to poll,
* (maybe finding a different error) if the target entity becomes unmanaged.
* Now it actively checks unmanaged by default, and still throws although it might
* now find a different problem. */
private final static boolean DEFAULT_IGNORE_UNMANAGED = false;
protected final Entity source;
protected final AttributeSensor<T> sensor;
protected final Predicate<? super T> ready;
protected final List<AttributeAndSensorCondition<?>> abortSensorConditions;
protected final String blockingDetails;
protected final Function<? super T,? extends V> postProcess;
protected final Duration timeout;
protected final Maybe<V> onTimeout;
protected final boolean ignoreUnmanaged;
protected final Maybe<V> onUnmanaged;
// TODO onError Continue / Throw / Return(V)
protected WaitInTaskForAttributeReady(Builder<T, V> builder) {
this.source = builder.source;
this.sensor = builder.sensor;
this.ready = builder.readiness;
this.abortSensorConditions = builder.abortSensorConditions;
this.blockingDetails = builder.blockingDetails;
this.postProcess = builder.postProcess;
this.timeout = builder.timeout;
this.onTimeout = builder.onTimeout;
this.ignoreUnmanaged = builder.ignoreUnmanaged;
this.onUnmanaged = builder.onUnmanaged;
}
private WaitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready,
List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) {
this.source = source;
this.sensor = sensor;
this.ready = ready;
this.abortSensorConditions = abortConditions;
this.blockingDetails = blockingDetails;
this.timeout = Duration.PRACTICALLY_FOREVER;
this.onTimeout = Maybe.absent();
this.ignoreUnmanaged = DEFAULT_IGNORE_UNMANAGED;
this.onUnmanaged = Maybe.absent();
this.postProcess = null;
}
@SuppressWarnings("unchecked")
protected V postProcess(T value) {
if (this.postProcess!=null) return postProcess.apply(value);
// if no post-processing assume the types are correct
return (V) value;
}
protected boolean ready(T value) {
if (ready!=null) return ready.apply(value);
return JavaGroovyEquivalents.groovyTruth(value);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public V call() {
Preconditions.checkNotNull(source, "source");
Preconditions.checkNotNull(sensor, "sensor on "+source);
T value = source.getAttribute(sensor);
// return immediately if either the ready predicate or the abort conditions hold
if (ready(value)) return postProcess(value);
final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
long start = System.currentTimeMillis();
for (AttributeAndSensorCondition abortCondition : abortSensorConditions) {
Object currentValue = abortCondition.source.getAttribute(abortCondition.sensor);
if (abortCondition.predicate.apply(currentValue)) {
abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+currentValue));
}
}
if (!abortionExceptions.isEmpty()) {
throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
}
TaskInternal<?> current = (TaskInternal<?>) Tasks.current();
if (current == null) throw new IllegalStateException("Should only be invoked in a running task");
Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current);
if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+
current+" has no entity tag ("+current.getStatusDetail(false)+")");
final LinkedList<T> publishedValues = new LinkedList<T>();
final Semaphore semaphore = new Semaphore(0); // could use Exchanger
SubscriptionHandle subscription = null;
List<SubscriptionHandle> abortSubscriptions = Lists.newArrayList();
try {
subscription = entity.subscriptions().subscribe(source, sensor, new SensorEventListener<T>() {
@Override public void onEvent(SensorEvent<T> event) {
synchronized (publishedValues) { publishedValues.add(event.getValue()); }
semaphore.release();
}});
for (final AttributeAndSensorCondition abortCondition : abortSensorConditions) {
abortSubscriptions.add(entity.subscriptions().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener<Object>() {
@Override public void onEvent(SensorEvent<Object> event) {
if (abortCondition.predicate.apply(event.getValue())) {
abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+event.getValue()));
semaphore.release();
}
}}));
Object currentValue = abortCondition.source.getAttribute(abortCondition.sensor);
if (abortCondition.predicate.apply(currentValue)) {
abortionExceptions.add(new Exception("Abort due to "+abortCondition+": "+currentValue));
}
}
if (!abortionExceptions.isEmpty()) {
throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
}
CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : null;
Duration maxPeriod = ValueResolver.PRETTY_QUICK_WAIT;
Duration nextPeriod = ValueResolver.REAL_QUICK_PERIOD;
while (true) {
// check the source on initial run (could be done outside the loop)
// and also (optionally) on each iteration in case it is more recent
value = source.getAttribute(sensor);
if (ready(value)) break;
if (timer!=null) {
if (timer.getDurationRemaining().isShorterThan(nextPeriod)) {
nextPeriod = timer.getDurationRemaining();
}
if (timer.isExpired()) {
if (onTimeout.isPresent()) return onTimeout.get();
throw new RuntimeTimeoutException("Unsatisfied after "+Duration.sinceUtc(start));
}
}
String prevBlockingDetails = current.setBlockingDetails(blockingDetails);
try {
if (semaphore.tryAcquire(nextPeriod.toMilliseconds(), TimeUnit.MILLISECONDS)) {
// immediately release so we are available for the next check
semaphore.release();
// if other permits have been made available (e.g. multiple notifications) drain them all as no point running multiple times
semaphore.drainPermits();
}
} finally {
current.setBlockingDetails(prevBlockingDetails);
}
// check any subscribed values which have come in first
while (true) {
synchronized (publishedValues) {
if (publishedValues.isEmpty()) break;
value = publishedValues.pop();
}
if (ready(value)) break;
}
// if unmanaged then ignore the other abort conditions
if (!ignoreUnmanaged && Entities.isNoLongerManaged(entity)) {
if (onUnmanaged.isPresent()) return onUnmanaged.get();
throw new NotManagedException(entity);
}
if (!abortionExceptions.isEmpty()) {
throw new CompoundRuntimeException("Aborted waiting for ready value from "+source+" "+sensor.getName(), abortionExceptions);
}
nextPeriod = nextPeriod.multiply(1.2).upperBound(maxPeriod);
}
if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source);
return postProcess(value);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
} finally {
if (subscription != null) {
entity.subscriptions().unsubscribe(subscription);
}
for (SubscriptionHandle handle : abortSubscriptions) {
entity.subscriptions().unsubscribe(handle);
}
}
}
}
/**
* Returns a {@link Task} which blocks until the given job returns, then returns the value of that job.
*
* @deprecated since 0.7; code will be moved into test utilities
*/
@Deprecated
public static <T> Task<T> whenDone(Callable<T> job) {
return new BasicTask<T>(MutableMap.of("tag", "whenDone", "displayName", "waiting for job"), job);
}
/**
* Returns a {@link Task} which waits for the result of first parameter, then applies the function in the second
* parameter to it, returning that result.
*
* Particular useful in Entity configuration where config will block until Tasks have completed,
* allowing for example an {@link #attributeWhenReady(Entity, AttributeSensor, Predicate)} expression to be
* passed in the first argument then transformed by the function in the second argument to generate
* the value that is used for the configuration
*/
public static <U,T> Task<T> transform(final Task<U> task, final Function<U,T> transformer) {
return transform(MutableMap.of("displayName", "transforming "+task), task, transformer);
}
/**
* @see #transform(Task, Function)
*
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <U,T> Task<T> transform(Task<U> task, Closure transformer) {
return transform(task, GroovyJavaMethods.functionFromClosure(transformer));
}
/**
* @see #transform(Task, Function)
*/
@SuppressWarnings({ "rawtypes" })
public static <U,T> Task<T> transform(final Map flags, final TaskAdaptable<U> task, final Function<U,T> transformer) {
return new BasicTask<T>(flags, new Callable<T>() {
@Override
public T call() throws Exception {
if (!task.asTask().isSubmitted()) {
BasicExecutionContext.getCurrentExecutionContext().submit(task);
}
return transformer.apply(task.asTask().get());
}});
}
/** Returns a task which waits for multiple other tasks (submitting if necessary)
* and performs arbitrary computation over the List of results.
* @see #transform(Task, Function) but note argument order is reversed (counterintuitive) to allow for varargs */
public static <U,T> Task<T> transformMultiple(Function<List<U>,T> transformer, @SuppressWarnings("unchecked") TaskAdaptable<U> ...tasks) {
return transformMultiple(MutableMap.of("displayName", "transforming multiple"), transformer, tasks);
}
/**
* @see #transformMultiple(Function, TaskAdaptable...)
*
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <U,T> Task<T> transformMultiple(Closure transformer, TaskAdaptable<U> ...tasks) {
return transformMultiple(GroovyJavaMethods.functionFromClosure(transformer), tasks);
}
/**
* @see #transformMultiple(Function, TaskAdaptable...)
*
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <U,T> Task<T> transformMultiple(Map flags, Closure transformer, TaskAdaptable<U> ...tasks) {
return transformMultiple(flags, GroovyJavaMethods.functionFromClosure(transformer), tasks);
}
/** @see #transformMultiple(Function, TaskAdaptable...) */
@SuppressWarnings({ "rawtypes" })
public static <U,T> Task<T> transformMultiple(Map flags, final Function<List<U>,T> transformer, @SuppressWarnings("unchecked") TaskAdaptable<U> ...tasks) {
return transformMultiple(flags, transformer, Arrays.asList(tasks));
}
@SuppressWarnings({ "rawtypes" })
public static <U,T> Task<T> transformMultiple(Map flags, final Function<List<U>,T> transformer, Collection<? extends TaskAdaptable<U>> tasks) {
if (tasks.size()==1) {
return transform(flags, Iterables.getOnlyElement(tasks), new Function<U,T>() {
@Override
@Nullable
public T apply(@Nullable U input) {
return transformer.apply(MutableList.of(input).asUnmodifiable());
}
});
}
return transform(flags, new ParallelTask<U>(tasks), transformer);
}
/** Method which returns a Future containing a string formatted using String.format,
* where the arguments can be normal objects or tasks;
* tasks will be waited on (submitted if necessary) and their results substituted in the call
* to String.format.
* <p>
* Example:
* <pre>
* {@code
* setConfig(URL, DependentConfiguration.formatString("%s:%s",
* DependentConfiguration.attributeWhenReady(target, Target.HOSTNAME),
* DependentConfiguration.attributeWhenReady(target, Target.PORT) ) );
* }
* </pre>
*/
@SuppressWarnings("unchecked")
public static Task<String> formatString(final Object spec, final Object ...args) {
List<TaskAdaptable<Object>> taskArgs = Lists.newArrayList();
Object[] newArgs = Lists.asList(spec, args).toArray();
for (Object arg: newArgs) {
if (arg instanceof TaskAdaptable) taskArgs.add((TaskAdaptable<Object>)arg);
else if (arg instanceof TaskFactory) taskArgs.add( ((TaskFactory<TaskAdaptable<Object>>)arg).newTask() );
}
return transformMultiple(
MutableMap.<String,String>of("displayName", "formatting '"+spec.toString()+"' with "+taskArgs.size()+" task"+(taskArgs.size()!=1?"s":"")),
new Function<List<Object>, String>() {
@Override public String apply(List<Object> input) {
Iterator<?> tri = input.iterator();
Object[] vv = new Object[newArgs.length];
int i=0;
for (Object arg : newArgs) {
if (arg instanceof TaskAdaptable || arg instanceof TaskFactory) vv[i] = tri.next();
else if (arg instanceof DeferredSupplier) vv[i] = ((DeferredSupplier<?>) arg).get();
else vv[i] = arg;
i++;
}
return String.format(vv[0].toString(), Arrays.copyOfRange(vv, 1, vv.length));
}},
taskArgs);
}
/**
* @throws ImmediateSupplier.ImmediateUnsupportedException if cannot evaluate this in a timely manner
*/
public static Maybe<String> formatStringImmediately(final Object spec, final Object ...args) {
String pattern = "";
Maybe<?> resolvedSpec = resolveImmediately(spec);
if (resolvedSpec.isPresent()) {
pattern = resolvedSpec.get().toString();
}
List<Object> resolvedArgs = Lists.newArrayList();
for (Object arg : args) {
Maybe<?> argVal = resolveImmediately(arg);
if (argVal.isAbsent()) return Maybe.Absent.castAbsent(argVal);
resolvedArgs.add(argVal.get());
}
return Maybe.of(String.format(pattern, resolvedArgs.toArray()));
}
/**
* @throws ImmediateSupplier.ImmediateUnsupportedException if cannot evaluate this in a timely manner
*/
public static Maybe<String> urlEncodeImmediately(Object arg) {
Maybe<?> resolvedArg = resolveImmediately(arg);
if (resolvedArg.isAbsent()) return Absent.castAbsent(resolvedArg);
if (resolvedArg.isNull()) return Maybe.<String>of((String)null);
String resolvedString = resolvedArg.get().toString();
return Maybe.of(Urls.encode(resolvedString));
}
/**
* Method which returns a Future containing an escaped URL string (see {@link Urls#encode(String)}).
* The arguments can be normal objects, tasks or {@link DeferredSupplier}s.
* tasks will be waited on (submitted if necessary) and their results substituted.
*/
@SuppressWarnings("unchecked")
public static Task<String> urlEncode(final Object arg) {
List<TaskAdaptable<Object>> taskArgs = Lists.newArrayList();
if (arg instanceof TaskAdaptable) taskArgs.add((TaskAdaptable<Object>)arg);
else if (arg instanceof TaskFactory) taskArgs.add( ((TaskFactory<TaskAdaptable<Object>>)arg).newTask() );
return transformMultiple(
MutableMap.<String,String>of("displayName", "url-escaping '"+arg),
new Function<List<Object>, String>() {
@Override
@Nullable
public String apply(@Nullable List<Object> input) {
Object resolvedArg;
if (arg instanceof TaskAdaptable || arg instanceof TaskFactory) resolvedArg = Iterables.getOnlyElement(input);
else if (arg instanceof DeferredSupplier) resolvedArg = ((DeferredSupplier<?>) arg).get();
else resolvedArg = arg;
if (resolvedArg == null) return null;
String resolvedString = resolvedArg.toString();
return Urls.encode(resolvedString);
}
},
taskArgs);
}
protected static <T> Maybe<?> resolveImmediately(Object val) {
if (val instanceof ImmediateSupplier<?>) {
return ((ImmediateSupplier<?>)val).getImmediately();
} else if (val instanceof TaskAdaptable) {
throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot immediately resolve value "+val);
} else if (val instanceof TaskFactory) {
throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot immediately resolve value "+val);
} else if (val instanceof DeferredSupplier<?>) {
throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot immediately resolve value "+val);
} else {
return Maybe.of(val);
}
}
public static Maybe<String> regexReplacementImmediately(Object source, Object pattern, Object replacement) {
Maybe<?> resolvedSource = resolveImmediately(source);
if (resolvedSource.isAbsent()) return Absent.castAbsent(resolvedSource);
String resolvedSourceStr = String.valueOf(resolvedSource.get());
Maybe<?> resolvedPattern = resolveImmediately(pattern);
if (resolvedPattern.isAbsent()) return Absent.castAbsent(resolvedPattern);
String resolvedPatternStr = String.valueOf(resolvedPattern.get());
Maybe<?> resolvedReplacement = resolveImmediately(replacement);
if (resolvedReplacement.isAbsent()) return Absent.castAbsent(resolvedReplacement);
String resolvedReplacementStr = String.valueOf(resolvedReplacement.get());
String result = new StringFunctions.RegexReplacer(resolvedPatternStr, resolvedReplacementStr).apply(resolvedSourceStr);
return Maybe.of(result);
}
public static Task<String> regexReplacement(Object source, Object pattern, Object replacement) {
List<TaskAdaptable<Object>> taskArgs = getTaskAdaptable(source, pattern, replacement);
Function<List<Object>, String> transformer = new RegexTransformerString(source, pattern, replacement);
return transformMultiple(
MutableMap.of("displayName", String.format("creating regex replacement function (%s:%s)", pattern, replacement)),
transformer,
taskArgs
);
}
public static Maybe<Function<String, String>> regexReplacementImmediately(Object pattern, Object replacement) {
Maybe<?> resolvedPattern = resolveImmediately(pattern);
if (resolvedPattern.isAbsent()) return Absent.castAbsent(resolvedPattern);
String resolvedPatternStr = String.valueOf(resolvedPattern.get());
Maybe<?> resolvedReplacement = resolveImmediately(replacement);
if (resolvedReplacement.isAbsent()) return Absent.castAbsent(resolvedReplacement);
String resolvedReplacementStr = String.valueOf(resolvedReplacement.get());
RegexReplacer result = new StringFunctions.RegexReplacer(resolvedPatternStr, resolvedReplacementStr);
return Maybe.<Function<String, String>>of(result);
}
public static Task<Function<String, String>> regexReplacement(Object pattern, Object replacement) {
List<TaskAdaptable<Object>> taskArgs = getTaskAdaptable(pattern, replacement);
Function<List<Object>, Function<String, String>> transformer = new RegexTransformerFunction(pattern, replacement);
return transformMultiple(
MutableMap.of("displayName", String.format("creating regex replacement function (%s:%s)", pattern, replacement)),
transformer,
taskArgs
);
}
public static Maybe<ReleaseableLatch> maxConcurrencyImmediately(Object maxThreads) {
Maybe<?> resolvedMaxThreads = resolveImmediately(maxThreads);
if (resolvedMaxThreads.isAbsent()) return Maybe.absent();
Integer resolvedMaxThreadsInt = TypeCoercions.coerce(resolvedMaxThreads, Integer.class);
ReleaseableLatch result = ReleaseableLatch.Factory.newMaxConcurrencyLatch(resolvedMaxThreadsInt);
return Maybe.<ReleaseableLatch>of(result);
}
public static Task<ReleaseableLatch> maxConcurrency(Object maxThreads) {
List<TaskAdaptable<Object>> taskArgs = getTaskAdaptable(maxThreads);
Function<List<Object>, ReleaseableLatch> transformer = new MaxThreadsTransformerFunction(maxThreads);
return transformMultiple(
MutableMap.of("displayName", String.format("creating max concurrency semaphore(%s)", maxThreads)),
transformer,
taskArgs
);
}
@SuppressWarnings("unchecked")
private static List<TaskAdaptable<Object>> getTaskAdaptable(Object... args){
List<TaskAdaptable<Object>> taskArgs = Lists.newArrayList();
for (Object arg: args) {
if (arg instanceof TaskAdaptable) {
taskArgs.add((TaskAdaptable<Object>)arg);
} else if (arg instanceof TaskFactory) {
taskArgs.add(((TaskFactory<TaskAdaptable<Object>>)arg).newTask());
}
}
return taskArgs;
}
public static class RegexTransformerString implements Function<List<Object>, String> {
private final Object source;
private final Object pattern;
private final Object replacement;
public RegexTransformerString(Object source, Object pattern, Object replacement){
this.source = source;
this.pattern = pattern;
this.replacement = replacement;
}
@Nullable
@Override
public String apply(@Nullable List<Object> input) {
Iterator<?> taskArgsIterator = input.iterator();
String resolvedSource = resolveArgument(source, taskArgsIterator);
String resolvedPattern = resolveArgument(pattern, taskArgsIterator);
String resolvedReplacement = resolveArgument(replacement, taskArgsIterator);
return new StringFunctions.RegexReplacer(resolvedPattern, resolvedReplacement).apply(resolvedSource);
}
}
@Beta
public static class RegexTransformerFunction implements Function<List<Object>, Function<String, String>> {
private final Object pattern;
private final Object replacement;
public RegexTransformerFunction(Object pattern, Object replacement){
this.pattern = pattern;
this.replacement = replacement;
}
@Override
public Function<String, String> apply(List<Object> input) {
Iterator<?> taskArgsIterator = input.iterator();
return new StringFunctions.RegexReplacer(resolveArgument(pattern, taskArgsIterator), resolveArgument(replacement, taskArgsIterator));
}
}
public static class MaxThreadsTransformerFunction implements Function<List<Object>, ReleaseableLatch> {
private final Object maxThreads;
public MaxThreadsTransformerFunction(Object maxThreads) {
this.maxThreads = maxThreads;
}
@Override
public ReleaseableLatch apply(List<Object> input) {
Iterator<?> taskArgsIterator = input.iterator();
Integer maxThreadsNum = resolveArgument(maxThreads, taskArgsIterator, Integer.class);
return ReleaseableLatch.Factory.newMaxConcurrencyLatch(maxThreadsNum);
}
}
/**
* Same as {@link #resolveArgument(Object, Iterator, Class) with type of String
*/
private static String resolveArgument(Object argument, Iterator<?> taskArgsIterator) {
return resolveArgument(argument, taskArgsIterator, String.class);
}
/**
* Resolves the argument as follows:
*
* If the argument is a DeferredSupplier, we will block and wait for it to resolve. If the argument is TaskAdaptable or TaskFactory,
* we will assume that the resolved task has been queued on the {@code taskArgsIterator}, otherwise the argument has already been resolved.
*
* @param type coerces the return value to the requested type
*/
private static <T> T resolveArgument(Object argument, Iterator<?> taskArgsIterator, Class<T> type) {
Object resolvedArgument;
if (argument instanceof TaskAdaptable) {
resolvedArgument = taskArgsIterator.next();
} else if (argument instanceof DeferredSupplier) {
resolvedArgument = ((DeferredSupplier<?>) argument).get();
} else {
resolvedArgument = argument;
}
return TypeCoercions.coerce(resolvedArgument, type);
}
/** returns a task for parallel execution returning a list of values for the given sensor for the given entity list,
* optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */
public static <T> Task<List<T>> listAttributesWhenReady(AttributeSensor<T> sensor, Iterable<Entity> entities) {
return listAttributesWhenReady(sensor, entities, JavaGroovyEquivalents.groovyTruthPredicate());
}
/**
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
public static <T> Task<List<T>> listAttributesWhenReady(AttributeSensor<T> sensor, Iterable<Entity> entities, Closure<Boolean> readiness) {
Predicate<Object> readinessPredicate = (readiness != null) ? GroovyJavaMethods.<Object>predicateFromClosure(readiness) : JavaGroovyEquivalents.groovyTruthPredicate();
return listAttributesWhenReady(sensor, entities, readinessPredicate);
}
/** returns a task for parallel execution returning a list of values of the given sensor list on the given entity,
* optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */
public static <T> Task<List<T>> listAttributesWhenReady(final AttributeSensor<T> sensor, Iterable<Entity> entities, Predicate<? super T> readiness) {
if (readiness == null) readiness = JavaGroovyEquivalents.groovyTruthPredicate();
return builder().attributeWhenReadyFromMultiple(entities, sensor, readiness).build();
}
/** @see #waitForTask(Task, Entity, String) */
public static <T> T waitForTask(Task<T> t, Entity context) throws InterruptedException {
return waitForTask(t, context, null);
}
/** blocks until the given task completes, submitting if necessary, returning the result of that task;
* optional contextMessage is available in status if this is running in a task
*/
@SuppressWarnings("unchecked")
public static <T> T waitForTask(Task<T> t, Entity context, String contextMessage) throws InterruptedException {
try {
return (T) Tasks.resolveValue(t, Object.class, ((EntityInternal)context).getExecutionContext(), contextMessage);
} catch (ExecutionException e) {
throw Throwables.propagate(e);
}
}
public static class AttributeAndSensorCondition<T> {
protected final Entity source;
protected final AttributeSensor<T> sensor;
protected final Predicate<? super T> predicate;
public AttributeAndSensorCondition(Entity source, AttributeSensor<T> sensor, Predicate<? super T> predicate) {
this.source = checkNotNull(source, "source");
this.sensor = checkNotNull(sensor, "sensor");
this.predicate = checkNotNull(predicate, "predicate");
}
@Override
public String toString() {
return JavaClassNames.simpleClassName(this)+"["+source+"["+sensor.getName()+"] "+predicate+"]";
}
}
public static ProtoBuilder builder() {
return new ProtoBuilder();
}
/**
* Builder for producing variants of attributeWhenReady.
*/
@Beta
public static class ProtoBuilder {
/**
* Will wait for the attribute on the given entity, with default behaviour:
* If that entity reports {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE_ACTUAL} then it will abort;
* If that entity is stopping or destroyed (see {@link Builder#timeoutIfStoppingOrDestroyed(Duration)}),
* then it will timeout after 1 minute.
*/
public <T2> Builder<T2,T2> attributeWhenReady(Entity source, AttributeSensor<T2> sensor) {
return new Builder<T2,T2>(source, sensor).abortIfOnFire().timeoutIfStoppingOrDestroyed(Duration.ONE_MINUTE);
}
/**
* Will wait for the attribute on the given entity, not aborting when it goes {@link Lifecycle#ON_FIRE}.
*/
public <T2> Builder<T2,T2> attributeWhenReadyAllowingOnFire(Entity source, AttributeSensor<T2> sensor) {
return new Builder<T2,T2>(source, sensor);
}
/** Constructs a builder for task for parallel execution returning a list of values of the given sensor list on the given entity,
* optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */
@Beta
public <T> MultiBuilder<T, T, List<T>> attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources, AttributeSensor<T> sensor) {
return attributeWhenReadyFromMultiple(sources, sensor, JavaGroovyEquivalents.groovyTruthPredicate());
}
/** As {@link #attributeWhenReadyFromMultiple(Iterable, AttributeSensor)} with an explicit readiness test. */
@Beta
public <T> MultiBuilder<T, T, List<T>> attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) {
return new MultiBuilder<T, T, List<T>>(sources, sensor, readiness);
}
}
/**
* Builder for producing variants of attributeWhenReady.
*/
public static class Builder<T,V> {
protected Entity source;
protected AttributeSensor<T> sensor;
protected Predicate<? super T> readiness;
protected Function<? super T, ? extends V> postProcess;
protected List<AttributeAndSensorCondition<?>> abortSensorConditions = Lists.newArrayList();
protected String blockingDetails;
protected Duration timeout;
protected Maybe<V> onTimeout = Maybe.absent();
protected boolean ignoreUnmanaged = WaitInTaskForAttributeReady.DEFAULT_IGNORE_UNMANAGED;
protected Maybe<V> onUnmanaged = Maybe.absent();
protected Builder(Entity source, AttributeSensor<T> sensor) {
this.source = source;
this.sensor = sensor;
}
/**
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
public Builder<T,V> readiness(Closure<Boolean> val) {
this.readiness = GroovyJavaMethods.predicateFromClosure(checkNotNull(val, "val"));
return this;
}
public Builder<T,V> readiness(Predicate<? super T> val) {
this.readiness = checkNotNull(val, "ready");
return this;
}
/**
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
@SuppressWarnings({ "unchecked", "rawtypes" })
public <V2> Builder<T,V2> postProcess(Closure<V2> val) {
this.postProcess = (Function) GroovyJavaMethods.<T,V2>functionFromClosure(checkNotNull(val, "postProcess"));
return (Builder<T,V2>) this;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public <V2> Builder<T,V2> postProcess(final Function<? super T, V2> val) {
this.postProcess = (Function) checkNotNull(val, "postProcess");
return (Builder<T,V2>) this;
}
public <T2> Builder<T,V> abortIf(Entity source, AttributeSensor<T2> sensor) {
return abortIf(source, sensor, JavaGroovyEquivalents.groovyTruthPredicate());
}
public <T2> Builder<T,V> abortIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate) {
abortSensorConditions.add(new AttributeAndSensorCondition<T2>(source, sensor, predicate));
return this;
}
/** Causes the depender to abort immediately if {@link Attributes#SERVICE_STATE_ACTUAL}
* is {@link Lifecycle#ON_FIRE}. */
public Builder<T,V> abortIfOnFire() {
abortIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.ON_FIRE));
return this;
}
/** Causes the depender to timeout after the given time if {@link Attributes#SERVICE_STATE_ACTUAL}
* is one of {@link Lifecycle#STOPPING}, {@link Lifecycle#STOPPED}, or {@link Lifecycle#DESTROYED}. */
public Builder<T,V> timeoutIfStoppingOrDestroyed(Duration time) {
timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.STOPPING), time);
timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.STOPPED), time);
timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.DESTROYED), time);
return this;
}
public Builder<T,V> blockingDetails(String val) {
blockingDetails = val;
return this;
}
/** specifies an optional timeout; by default it waits forever, or until unmanaged or other abort condition */
public Builder<T,V> timeout(Duration val) {
timeout = val;
return this;
}
/** specifies the supplied timeout if the condition is met */
public <T2> Builder<T,V> timeoutIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate, Duration val) {
if (predicate.apply(source.sensors().get(sensor))) timeout(val);
return this;
}
public Builder<T,V> onTimeoutReturn(V val) {
onTimeout = Maybe.of(val);
return this;
}
public Builder<T,V> onTimeoutThrow() {
onTimeout = Maybe.<V>absent();
return this;
}
public Builder<T,V> onUnmanagedReturn(V val) {
onUnmanaged = Maybe.of(val);
return this;
}
public Builder<T,V> onUnmanagedThrow() {
onUnmanaged = Maybe.<V>absent();
return this;
}
/** @since 0.7.0 included in case old behaviour of not checking whether the entity is managed is required
* (I can't see why it is; polling will likely give errors, once it is unmanaged this will never completed,
* and before management the current code will continue, so long as there are no other errors) */ @Deprecated
public Builder<T,V> onUnmanagedContinue() {
ignoreUnmanaged = true;
return this;
}
/** take advantage of the fact that this builder can build multiple times, allowing subclasses
* to change the source along the way */
protected Builder<T,V> source(Entity source) {
this.source = source;
return this;
}
/** as {@link #source(Entity)} */
@SuppressWarnings({ "unchecked", "rawtypes" })
protected Builder<T,V> sensor(AttributeSensor<? extends T> sensor) {
this.sensor = (AttributeSensor) sensor;
return this;
}
public Task<V> build() {
validate();
return Tasks.<V>builder().dynamic(false)
.displayName("waiting on "+sensor.getName())
.description("Waiting on sensor "+sensor.getName()+" from "+source)
.tag("attributeWhenReady")
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
.body(new WaitInTaskForAttributeReady<T,V>(this))
.build();
}
public V runNow() {
validate();
return new WaitInTaskForAttributeReady<T,V>(this).call();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void validate() {
checkNotNull(source, "Entity source");
checkNotNull(sensor, "Sensor");
if (readiness == null) readiness = JavaGroovyEquivalents.groovyTruthPredicate();
if (postProcess == null) postProcess = (Function) Functions.identity();
}
}
/**
* Builder for producing variants of attributeWhenReady.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Beta
public static class MultiBuilder<T, V, V2> {
protected final String name;
protected final String descriptionBase;
protected final Builder<T,V> builder;
// if desired, the use of this multiSource could allow different conditions;
// but probably an easier API just for the caller to build the parallel task
protected final List<AttributeAndSensorCondition<?>> multiSource = Lists.newArrayList();
protected Function<? super List<V>, V2> postProcessFromMultiple;
/** returns a task for parallel execution returning a list of values of the given sensor list on the given entity,
* optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */
@Beta
protected MultiBuilder(Iterable<? extends Entity> sources, AttributeSensor<T> sensor) {
this(sources, sensor, JavaGroovyEquivalents.groovyTruthPredicate());
}
@Beta
protected MultiBuilder(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) {
builder = new Builder<T,V>(null, sensor);
builder.readiness(readiness);
for (Entity s : checkNotNull(sources, "sources")) {
multiSource.add(new AttributeAndSensorCondition<T>(s, sensor, readiness));
}
this.name = "waiting on "+sensor.getName();
this.descriptionBase = "waiting on "+sensor.getName()+" "+readiness
+" from "+Iterables.size(sources)+" entit"+Strings.ies(sources);
}
/** Apply post-processing to the entire list of results */
public <V2b> MultiBuilder<T, V, V2b> postProcessFromMultiple(final Function<? super List<V>, V2b> val) {
this.postProcessFromMultiple = (Function) checkNotNull(val, "postProcessFromMulitple");
return (MultiBuilder<T,V, V2b>) this;
}
/** Apply post-processing to the entire list of results
* See {@link CollectionFunctionals#all(Predicate)} and {@link CollectionFunctionals#quorum(org.apache.brooklyn.util.collections.QuorumCheck, Predicate)
* which allow useful arguments. */
public MultiBuilder<T, V, Boolean> postProcessFromMultiple(final Predicate<? super List<V>> val) {
return postProcessFromMultiple(Functions.forPredicate(val));
}
/**
* @deprecated since 0.11.0; explicit groovy utilities/support will be deleted.
*/
@Deprecated
public <V1> MultiBuilder<T, V1, V2> postProcess(Closure<V1> val) {
builder.postProcess(val);
return (MultiBuilder<T, V1, V2>) this;
}
public <V1> MultiBuilder<T, V1, V2> postProcess(final Function<? super T, V1> val) {
builder.postProcess(val);
return (MultiBuilder<T, V1, V2>) this;
}
public <T2> MultiBuilder<T, V, V2> abortIf(Entity source, AttributeSensor<T2> sensor) {
builder.abortIf(source, sensor);
return this;
}
public <T2> MultiBuilder<T, V, V2> abortIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate) {
builder.abortIf(source, sensor, predicate);
return this;
}
public MultiBuilder<T, V, V2> abortIfOnFire() {
builder.abortIfOnFire();
return this;
}
public MultiBuilder<T, V, V2> blockingDetails(String val) {
builder.blockingDetails(val);
return this;
}
public MultiBuilder<T, V, V2> timeout(Duration val) {
builder.timeout(val);
return this;
}
public MultiBuilder<T, V, V2> onTimeoutReturn(V val) {
builder.onTimeoutReturn(val);
return this;
}
public MultiBuilder<T, V, V2> onTimeoutThrow() {
builder.onTimeoutThrow();
return this;
}
public MultiBuilder<T, V, V2> onUnmanagedReturn(V val) {
builder.onUnmanagedReturn(val);
return this;
}
public MultiBuilder<T, V, V2> onUnmanagedThrow() {
builder.onUnmanagedThrow();
return this;
}
public Task<V2> build() {
List<Task<V>> tasks = MutableList.of();
for (AttributeAndSensorCondition<?> source: multiSource) {
builder.source(source.source);
builder.sensor((AttributeSensor)source.sensor);
builder.readiness((Predicate)source.predicate);
tasks.add(builder.build());
}
final Task<List<V>> parallelTask = Tasks.<List<V>>builder().parallel(true).addAll(tasks)
.displayName(name)
.description(descriptionBase+
(builder.timeout!=null ? ", timeout "+builder.timeout : ""))
.build();
if (postProcessFromMultiple == null) {
// V2 should be the right type in normal operations
return (Task<V2>) parallelTask;
} else {
return Tasks.<V2>builder().displayName(name).description(descriptionBase)
.tag("attributeWhenReady")
.body(new Callable<V2>() {
@Override public V2 call() throws Exception {
List<V> prePostProgress = DynamicTasks.queue(parallelTask).get();
return DynamicTasks.queue(
Tasks.<V2>builder().displayName("post-processing").description("Applying "+postProcessFromMultiple)
.body(Functionals.callable(postProcessFromMultiple, prePostProgress))
.build()).get();
}
})
.build();
}
}
}
}