blob: 6e975e410a14e6df06a02e9fee22fe8894922397 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.brooklyn.core.workflow.steps.variables;
import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.typereg.RegisteredType;
import org.apache.brooklyn.api.typereg.RegisteredTypeLoadingContext;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.typereg.RegisteredTypeLoadingContexts;
import org.apache.brooklyn.core.typereg.RegisteredTypes;
import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
import org.apache.brooklyn.core.workflow.WorkflowExpressionResolution;
import org.apache.brooklyn.core.workflow.WorkflowStepDefinition;
import org.apache.brooklyn.core.workflow.WorkflowStepInstanceExecutionContext;
import org.apache.brooklyn.util.collections.*;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.*;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.apache.brooklyn.core.workflow.WorkflowExecutionContext.STEP_TARGET_NAME_FOR_END;
import static org.apache.brooklyn.core.workflow.steps.variables.SetVariableWorkflowStep.setWorkflowScratchVariableDotSeparated;
public class TransformVariableWorkflowStep extends WorkflowStepDefinition {
private static final Logger log = LoggerFactory.getLogger(TransformVariableWorkflowStep.class);
public static final String SHORTHAND =
"[ [ ${variable.type} ] [ ?${value_is_initial} \"value\" ] ${} " +
"[ [ \"=\" ${value...} ] \"|\" ${transform...} ] ]";
public static final ConfigKey<TypedValueToSet> VARIABLE = ConfigKeys.newConfigKey(TypedValueToSet.class, "variable");
public static final ConfigKey<Boolean> VALUE_IS_INITIAL = ConfigKeys.newConfigKey(Boolean.class, "value_is_initial");
public static final ConfigKey<Object> VALUE = ConfigKeys.newConfigKey(Object.class, "value");
public static final ConfigKey<Object> TRANSFORM = ConfigKeys.newConfigKey(Object.class, "transform");
public void populateFromShorthand(String expression) {
populateFromShorthandTemplate(SHORTHAND, expression, true, true);
public void validateStep(@Nullable ManagementContext mgmt, @Nullable WorkflowExecutionContext workflow) {
super.validateStep(mgmt, workflow);
if (!input.containsKey(VARIABLE.getName())) {
throw new IllegalArgumentException("Variable name is required");
if (!input.containsKey(TRANSFORM.getName())) {
throw new IllegalArgumentException("Transform is required");
protected Object doTaskBody(WorkflowStepInstanceExecutionContext context) {
TypedValueToSet variable = context.getInput(VARIABLE);
if (variable==null) throw new IllegalArgumentException("Variable name is required");
String name = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT,, String.class);
if (Strings.isBlank(name)) throw new IllegalArgumentException("Variable name is required");
Object transformO = context.getInput(TRANSFORM);
if (!(transformO instanceof Iterable)) transformO = MutableList.of(transformO);
List<String> transforms = MutableList.of();
for (Object t: (Iterable)transformO) {
if (t instanceof String) transforms.addAll(MutableList.copyOf( ((String)t).split("\\|") ).map(String::trim).collect(Collectors.toList())));
else throw new IllegalArgumentException("Argument to transform should be a string or list of strings, not: "+t);
Object v;
if (input.containsKey(VALUE.getName())) {
if (Boolean.TRUE.equals(context.getInput(VALUE_IS_INITIAL))) throw new IllegalArgumentException("Cannot specifiy value_is_initial (keyword \"value\") with an = value");
v = input.get(VALUE.getName());
if (Strings.isNonBlank(variable.type)) transforms.add("type "+variable.type);
} else {
v = "${" + + "}";
if (Boolean.TRUE.equals(context.getInput(VALUE_IS_INITIAL)) || "value".equals(variable.type)) {
// special keyword to treat name as a literal expression
v =;
if (Strings.isNonBlank(variable.type)) {
if (Boolean.TRUE.equals(context.getInput(VALUE_IS_INITIAL)) || !"value".equals(variable.type)) {
transforms.add(0, "type " + variable.type);
name = null;
boolean isResolved = false;
for (String t: transforms) {
WorkflowTransformWithContext tt = getTransform(context, t);
if (tt.isResolver()) {
if (isResolved) throw new IllegalArgumentException("Transform '"+t+"' must be first as it requires unresolved input");
isResolved = true;
} else {
if (!isResolved) {
v = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, v, TypeToken.of(Object.class));
isResolved = true;
v = tt.apply(v);
if (name!=null) {
Object oldValue = setWorkflowScratchVariableDotSeparated(context, name, v);
context.noteOtherMetadata("Value set", v);
if (oldValue != null) context.noteOtherMetadata("Previous value", oldValue);
if (context.getOutput()!=null) throw new IllegalStateException("Transform that produces output results cannot be used when setting a variable");
if (STEP_TARGET_NAME_FOR_END.equals( throw new IllegalStateException("Return transform cannot be used when setting a variable");
return context.getPreviousStepOutput();
} else {
return v;
static WorkflowTransformWithContext transformOf(final Function f) {
if (f instanceof WorkflowTransformWithContext) return (WorkflowTransformWithContext) f;
return new WorkflowTransformDefault() {
public Object apply(Object o) {
return f.apply(o);
WorkflowTransformWithContext getTransform(WorkflowStepInstanceExecutionContext context, String transformDef) {
List<String> transformWords = Arrays.asList(transformDef.split(" "));
String transformType = transformWords.get(0);
Function t = Maybe.ofDisallowingNull(TRANSFORMATIONS.get(transformType)).map(Supplier::get).orNull();
if (t==null) {
RegisteredTypeLoadingContext lc = RegisteredTypeLoadingContexts.withLoader(
RegisteredType rt = context.getManagementContext().getTypeRegistry().get(transformType, lc);
if (rt!=null) t = context.getManagementContext().getTypeRegistry().create(rt, lc, WorkflowTransform.class);
if (t==null) throw new IllegalStateException("Unknown transform '"+transformType+"'");
WorkflowTransformWithContext ta = transformOf(t);
ta.init(context.getWorkflowExectionContext(), context, transformWords, transformDef);
return ta;
private final static Map<String, Supplier<Function>> TRANSFORMATIONS = MutableMap.of();
static {
TRANSFORMATIONS.put("trim", () -> new TransformTrim());
TRANSFORMATIONS.put("merge", () -> new TransformMerge());
TRANSFORMATIONS.put("json", () -> new TransformJsonish(true, false, false));
TRANSFORMATIONS.put("yaml", () -> new TransformJsonish(false, true, false));
TRANSFORMATIONS.put("bash", () -> new TransformJsonish(true, false, true));
TRANSFORMATIONS.put("replace", () -> new TransformReplace());
TRANSFORMATIONS.put("wait", () -> new TransformWait());
TRANSFORMATIONS.put("type", () -> new TransformType());
TRANSFORMATIONS.put("first", () -> x -> {
if (x==null) return null;
if (!(x instanceof Iterable)) throw new IllegalArgumentException("Cannot take first of non-list type "+x);
Iterator xi = ((Iterable) x).iterator();
if (xi.hasNext()) return;
return null;
TRANSFORMATIONS.put("last", () -> x -> {
if (x==null) return null;
if (!(x instanceof Iterable)) throw new IllegalArgumentException("Cannot take first of non-list type "+x);
Iterator xi = ((Iterable) x).iterator();
Object last = null;
while (xi.hasNext()) last =;
return last;
TRANSFORMATIONS.put("max", () -> v -> minmax(v, "max", i -> i>0));
TRANSFORMATIONS.put("min", () -> v -> minmax(v, "min", i -> i<0));
TRANSFORMATIONS.put("sum", () -> v -> sum(v, "sum"));
TRANSFORMATIONS.put("average", () -> v -> average(v, "average"));
TRANSFORMATIONS.put("size", () -> v -> size(v, "size"));
TRANSFORMATIONS.put("get", () -> v -> {
if (v instanceof Supplier) return ((Supplier)v).get();
return v;
TRANSFORMATIONS.put("to_string", () -> v -> Strings.toString(v));
TRANSFORMATIONS.put("to_upper_case", () -> v -> ((String)v).toUpperCase());
TRANSFORMATIONS.put("to_lower_case", () -> v -> ((String)v).toLowerCase());
TRANSFORMATIONS.put("return", () -> new TransformReturn());
TRANSFORMATIONS.put("set", () -> new TransformSetWorkflowVariable());
static final Object minmax(Object v, String word, Predicate<Integer> test) {
if (v==null) return null;
if (!(v instanceof Iterable)) throw new IllegalArgumentException("Value is not an iterable; cannot take "+word);
Object result = null;
for (Object vi: (Iterable)v) {
if (result==null) result = vi;
else if (!(vi instanceof Comparable)) throw new IllegalArgumentException("Argument is not comparable; cannot take "+word);
else if (test.test( ((Comparable)vi).compareTo(result) )) result = vi;
return result;
static final Object sum(Object v, String word) {
if (v==null) return null;
if (!(v instanceof Iterable)) throw new IllegalArgumentException("Value is not an iterable; cannot take "+word);
Iterable<?> vi = (Iterable<?>) v;
if (vi.iterator().hasNext() && vi.iterator().next() instanceof Duration) {
Duration result = Duration.ZERO;
for (Object vii: vi) {
result = result.add(TypeCoercions.coerce(vii, Duration.class));
return result;
double result = 0;
for (Object vii: vi) {
result += asDouble(vii).get();
return simplifiedToIntOrLongIfPossible(result);
static Maybe<Double> asDouble(Object x) {
Maybe<Double> v = TypeCoercions.tryCoerce(x, Double.class);
if (v.isPresent() && !Double.isFinite(v.get())) return Maybe.absent(() -> new IllegalArgumentException("Value cannot be coerced to double: "+v));
return v;
static Number simplifiedToIntOrLongIfPossible(Number x) {
if (x instanceof Integer) return x;
if (x instanceof Long) {
if (x.longValue() == x.intValue()) return x.intValue();
return x;
if (x instanceof Double || x instanceof Float) {
double xd = x.doubleValue();
if (Math.abs(xd - Math.round(xd)) < 0.000000001) {
return simplifiedToIntOrLongIfPossible(Math.round(xd));
return x;
static final Integer size(Object v, String word) {
if (v==null) return null;
if (v instanceof Iterable) return Iterables.size((Iterable<?>) v);
if (v instanceof Map) return ((Map)v).size();
throw new IllegalArgumentException("Argument is not a set or map; cannot compute "+word);
static final Object average(Object v, String word) {
if (v==null) return null;
Integer count = size(v, "average");
if (count==null || count==0) throw new IllegalArgumentException("Value is empty; cannot take "+word);
Object sum = sum(v, "average");
if (sum instanceof Duration) {
return Duration.millis(Math.round(asDouble( ((Duration)sum).toMilliseconds() ).get() / count) );
return simplifiedToIntOrLongIfPossible(asDouble(sum).get() / count);
@Override protected Boolean isDefaultIdempotent() { return true; }