add a subworkflow step which shares vars with parents
and stricter+better error messages if reducing, targets etc used on custom workflow steps where they shouldn't be
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 87ea371..f154d92 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -148,6 +148,24 @@
@JsonIgnore // persist as sensor but not via REST in case it has secrets resolved
Map<String,Object> inputResolved = MutableMap.of();
+ public boolean hasInput(ConfigKey<?> key) {
+ return hasInput(key.getName());
+ }
+ public boolean hasInput(String key) {
+ return input.containsKey(key);
+ }
+ @JsonIgnore
+ public Map<String, Object> getAllInput() {
+ return input;
+ }
+ @JsonIgnore
+ public Map<String, Object> getAllInputResolved() {
+ return inputResolved;
+ }
+ public void noteInputResolved(String k, Object v) {
+ inputResolved.put(k, v);
+ }
+
Object outputDefinition;
/** final output of the workflow, set at end */
Object output;
@@ -657,7 +675,7 @@
if (vm.isPresent()) {
if (WorkflowStepInstanceExecutionContext.REMEMBER_RESOLVED_INPUT) {
// this will keep spending time resolving, but will resolve the resolved value
- inputResolved.put(key, vm.get());
+ noteInputResolved(key, vm.get());
}
}
return vm;
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
index dd16038..ab4836f 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
@@ -20,6 +20,7 @@
import com.google.common.annotations.Beta;
import com.google.common.reflect.TypeToken;
+import freemarker.core.InvalidReferenceException;
import freemarker.template.TemplateHashModel;
import freemarker.template.TemplateModel;
import freemarker.template.TemplateModelException;
@@ -216,8 +217,6 @@
// the main exception, handled here, is if we are setting an input
candidate = null;
errors.add(t);
- } else {
- throw Exceptions.propagate(t);
}
}
if (candidate != null) return TemplateProcessor.wrapAsTemplateModel(candidate);
@@ -495,7 +494,7 @@
entry = WorkflowVariableResolutionStackEntry.setting(context, stage, variable);
if (!RESOLVE_STACK.push(entry)) {
entry = null;
- throw new WorkflowVariableRecursiveReference("Recursive reference setting "+variable+": "+RESOLVE_STACK.getAll(false).stream().map(p -> p.object!=null ? p.object.toString() : p.settingVariable).collect(Collectors.joining("->")));
+ throw new WorkflowVariableRecursiveReference("Recursive or missing reference setting "+variable+": "+RESOLVE_STACK.getAll(false).stream().map(p -> p.object!=null ? p.object.toString() : p.settingVariable).collect(Collectors.joining("->")));
}
return callable.get();
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
index 704904f..db8543f 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
@@ -69,6 +69,15 @@
@JsonIgnore // persist as sensor but not via REST in case it has secrets resolved
Map<String,Object> inputResolved = MutableMap.of();
+ @JsonIgnore
+ public Map<String, Object> getAllInput() {
+ return input;
+ }
+ @JsonIgnore
+ public Map<String, Object> getAllInputResolved() {
+ return inputResolved;
+ }
+
transient WorkflowExecutionContext context;
// replay instructions or a string explicit next step identifier
public Object next;
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
index e56d38e..55681a7 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
@@ -43,12 +43,14 @@
import org.apache.brooklyn.core.workflow.utils.WorkflowRetentionParser;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.javalang.Reflections;
import org.apache.brooklyn.util.text.Strings;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
@@ -56,9 +58,11 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -76,6 +80,17 @@
private static final String WORKFLOW_SETTING_SHORTHAND = "[ \"replayable\" ${replayable...} ] [ \"retention\" ${retention...} ] ";
+ /* fields which are only permitted in the registered type definition */
+ protected static final Set<String> FORBIDDEN_IN_WORKFLOW_STEP = MutableSet.copyOf(Arrays.asList(WorkflowCommonConfig.PARAMETER_DEFS)
+ .stream().map(ConfigKey::getName).collect(Collectors.toSet())).asUnmodifiable();
+
+ protected static final Set<String> FORBIDDEN_IN_WORKFLOW_STEP_IN_SUBCLASSES = MutableSet.copyOf(Arrays.asList(WorkflowCommonConfig.STEPS)
+ .stream().map(ConfigKey::getName).collect(Collectors.toSet())).put("target").asUnmodifiable();
+
+ public static final boolean CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE = false;
+ protected static final Set<String> FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS =
+ (CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE ? MutableSet.<String>of() : MutableSet.of("reducing")).asUnmodifiable();
+
public CustomWorkflowStep() {}
public CustomWorkflowStep(String name, List<Object> steps) {
this.name = name;
@@ -245,7 +260,7 @@
targetR = checkTarget(targetR);
- Map reducingV = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, reducing, Map.class);
+ Map reducingV = initializeReducingVariables(context, reducing);
AtomicInteger index = new AtomicInteger(0);
((Iterable<?>) targetR).forEach(t -> {
@@ -263,6 +278,8 @@
}
});
+ initializeNestedWorkflows(context, nestedWorkflowContext);
+
StepState state = getStepState(context);
state.wasList = wasList;
state.reducing = reducingV;
@@ -407,7 +424,7 @@
assert submitted.isEmpty();
if (lastWorkflowRunBeforeReplay!=null) {
// if interrupted we need to explicitly take from the last step
- reducingV = updateReducingWorkflowVarsFromLastStep(lastWorkflowRunBeforeReplay, reducingV);
+ reducingV = getReducingWorkflowVarsFromLastStep(context, lastWorkflowRunBeforeReplay, reducingV);
}
for (Pair<WorkflowExecutionContext, Task<?>> p : delayedBecauseReducing) {
WorkflowExecutionContext wc = p.getLeft();
@@ -418,7 +435,7 @@
DynamicTasks.queue(p.getRight()).getUnchecked();
- reducingV = updateReducingWorkflowVarsFromLastStep(wc, reducingV);
+ reducingV = getReducingWorkflowVarsFromLastStep(context, wc, reducingV);
}
}
@@ -433,16 +450,22 @@
returnValue = !wasList ? Iterables.getOnlyElement(result) : result;
} else {
context.setOutput(reducingV);
- context.getWorkflowExectionContext().updateWorkflowScratchVariables(reducingV);
returnValue = reducingV;
}
return returnValue;
}
- private static MutableMap<String, Object> updateReducingWorkflowVarsFromLastStep(WorkflowExecutionContext lastStep, Map<String, Object> prevWorkflowVars) {
- Map<String, Object> lastStepReducingWorkflowVars = lastStep.getWorkflowScratchVariables();
- Object lastStepOutput = lastStep.getOutput();
+ protected Map initializeReducingVariables(WorkflowStepInstanceExecutionContext context, Map<String, Object> reducing) {
+ return context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, reducing, Map.class);
+ }
+
+ protected void initializeNestedWorkflows(WorkflowStepInstanceExecutionContext outerContext, List<WorkflowExecutionContext> nestedWorkflowContext) {
+ }
+
+ protected Map<String, Object> getReducingWorkflowVarsFromLastStep(WorkflowStepInstanceExecutionContext outerContext, WorkflowExecutionContext lastRun, Map<String, Object> prevWorkflowVars) {
+ Map<String, Object> lastStepReducingWorkflowVars = lastRun.getWorkflowScratchVariables();
+ Object lastStepOutput = lastRun.getOutput();
MutableMap<String, Object> result = MutableMap.copyOf(prevWorkflowVars);
prevWorkflowVars.keySet().forEach(k -> {
result.put(k, lastStepReducingWorkflowVars.get(k));
@@ -450,6 +473,11 @@
result.put(k, ((Map) lastStepOutput).get(k));
}
});
+
+ // we return them as output; we also set them as scratch, which might not be necessary / ideal,
+ // but it's what we've done for a while, so don't break that, yet
+ outerContext.getWorkflowExectionContext().updateWorkflowScratchVariables(result);
+
return result;
}
@@ -486,7 +514,7 @@
@Override
public WorkflowStepDefinition applySpecialDefinition(ManagementContext mgmt, Object definition, String typeBestGuess, WorkflowStepDefinitionWithSpecialDeserialization firstParse) {
- // if we've resolved a custom workflow step, we need to make sure that the map supplied here
+ // if we've resolved a custom workflow step, we need to make sure that the map supplied as part of the workflow
// - doesn't set parameters
// - doesn't set steps unless it is a simple `workflow` step (not a custom step)
// - (also caller must not override shorthand definition, but that is explicitly removed by WorkflowStepResolution)
@@ -497,19 +525,8 @@
}
CustomWorkflowStep result = (CustomWorkflowStep) firstParse;
Map m = (Map)definition;
- for (String forbiddenKey: new String[] { "parameters" }) {
- if (m.containsKey(forbiddenKey)) {
- throw new IllegalArgumentException("Not permitted to override '" + forbiddenKey + "' when using a workflow step");
- }
- }
- if (!isPermittedToSetSteps(typeBestGuess)) {
- // custom workflow step
- for (String forbiddenKey : new String[]{"steps"}) {
- if (m.containsKey(forbiddenKey)) {
- throw new IllegalArgumentException("Not permitted to override '" + forbiddenKey + "' when using a custom workflow step");
- }
- }
- }
+ checkCallerSuppliedDefinition(typeBestGuess, m);
+
if (m.containsKey("output")) {
// need to restore the workflow output from the base definition
try {
@@ -526,8 +543,34 @@
return result;
}
- protected boolean isPermittedToSetSteps(String typeBestGuess) {
- return typeBestGuess==null || SHORTHAND_TYPE_NAME_DEFAULT.equals(typeBestGuess) || CustomWorkflowStep.class.getName().equals(typeBestGuess);
+ protected void checkCallerSuppliedDefinition(String typeBestGuess, Map m) {
+ // caller (workflow author) cannot set parameters, that makes no sense
+ FORBIDDEN_IN_WORKFLOW_STEP.stream().filter(m::containsKey).forEach(forbiddenKey -> {
+ throw new IllegalArgumentException("Not permitted to override '" + forbiddenKey + "' when using a workflow step");
+ });
+
+ if (!CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE && !isInternalClassNotExtendedAndUserAllowedToSetMostThings(typeBestGuess)) {
+ // caller can't specify these
+ FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS.stream().filter(m::containsKey).forEach(forbiddenKey -> {
+ throw new IllegalArgumentException("Not permitted to set '" + forbiddenKey + "' when using a custom workflow step");
+ });
+ // neither should the custom registered type itself!
+ FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS.stream().filter(k -> (Reflections.getFieldValueMaybe(this, k).isPresentAndNonNull())).forEach(forbiddenKey -> {
+ throw new IllegalArgumentException("Not permitted for a custom workflow step to use '" + forbiddenKey + "'");
+ });
+ }
+ if (!isInternalClassNotExtendedAndUserAllowedToSetMostThings(typeBestGuess)) {
+ FORBIDDEN_IN_WORKFLOW_STEP_IN_SUBCLASSES.stream().filter(m::containsKey).forEach(forbiddenKey -> {
+ throw new IllegalArgumentException("Not permitted to override '" + forbiddenKey + "' when using a custom workflow step");
+ });
+ }
+ }
+
+ protected boolean isInternalClassNotExtendedAndUserAllowedToSetMostThings(String typeBestGuess) {
+ return !isRegisteredTypeExtensionToClass(CustomWorkflowStep.class, SHORTHAND_TYPE_NAME_DEFAULT, typeBestGuess);
+ }
+ protected boolean isRegisteredTypeExtensionToClass(Class<? extends CustomWorkflowStep> clazz, String shorthandDefault, String typeBestGuess) {
+ return typeBestGuess!=null && !shorthandDefault.equals(typeBestGuess) && !clazz.getName().equals(typeBestGuess);
}
protected WorkflowExecutionContext newWorkflow(WorkflowStepInstanceExecutionContext context, Object target, Integer targetIndexOrNull) {
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
index 3c179bc..e4a511c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
@@ -55,8 +55,8 @@
throw new IllegalArgumentException("Target of foreach must be a list or an expression that resolves to a list, not "+targetR);
}
- protected boolean isPermittedToSetSteps(String typeBestGuess) {
- return typeBestGuess==null || SHORTHAND_TYPE_NAME_DEFAULT.equals(typeBestGuess) || ForeachWorkflowStep.class.getName().equals(typeBestGuess);
+ protected boolean isInternalClassNotExtendedAndUserAllowedToSetMostThings(String typeBestGuess) {
+ return !isRegisteredTypeExtensionToClass(ForeachWorkflowStep.class, SHORTHAND_TYPE_NAME_DEFAULT, typeBestGuess);
}
protected void initializeSubWorkflowForTarget(WorkflowStepInstanceExecutionContext context, Object target, WorkflowExecutionContext nestedWorkflowContext) {
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java
new file mode 100644
index 0000000..0e09785
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.workflow.steps.flow;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.workflow.WorkflowCommonConfig;
+import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
+import org.apache.brooklyn.core.workflow.WorkflowExpressionResolution;
+import org.apache.brooklyn.core.workflow.WorkflowStepDefinition;
+import org.apache.brooklyn.core.workflow.WorkflowStepInstanceExecutionContext;
+import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.javalang.Reflections;
+
+public class SubWorkflowStep extends CustomWorkflowStep {
+
+ public static final String SHORTHAND_TYPE_NAME_DEFAULT = "subworkflow";
+
+ protected static final Set<String> FORBIDDEN_IN_SUBWORKFLOW_STEP_ALWAYS = MutableSet.copyOf(FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS)
+ .putAll(MutableSet.of("target", "concurrency")).asUnmodifiable();
+
+ public SubWorkflowStep() {}
+
+ public SubWorkflowStep(CustomWorkflowStep base) {
+ super(base);
+ }
+
+ protected boolean isInternalClassNotExtendedAndUserAllowedToSetMostThings(String typeBestGuess) {
+ return !isRegisteredTypeExtensionToClass(SubWorkflowStep.class, SHORTHAND_TYPE_NAME_DEFAULT, typeBestGuess);
+ }
+
+ protected void checkCallerSuppliedDefinition(String typeBestGuess, Map m) {
+ if (!isInternalClassNotExtendedAndUserAllowedToSetMostThings(typeBestGuess)) {
+ throw new IllegalArgumentException("Not permitted to define a custom subworkflow step");
+ }
+ // these can't be set by user or registered type for subworkflow
+ FORBIDDEN_IN_SUBWORKFLOW_STEP_ALWAYS.stream().filter(m::containsKey).forEach(forbiddenKey -> {
+ throw new IllegalArgumentException("Not permitted to set '" + forbiddenKey + "' when using a subworkflow step");
+ });
+ FORBIDDEN_IN_SUBWORKFLOW_STEP_ALWAYS.stream().filter(k -> (Reflections.getFieldValueMaybe(this, k).isPresentAndNonNull())).forEach(forbiddenKey -> {
+ throw new IllegalArgumentException("Not permitted for a subworkflow step to use '" + forbiddenKey + "'");
+ });
+ }
+
+ @Override
+ protected Map initializeReducingVariables(WorkflowStepInstanceExecutionContext context, Map<String, Object> reducing) {
+ return super.initializeReducingVariables(context, context.getWorkflowExectionContext().getWorkflowScratchVariables());
+ }
+
+ @Override
+ protected void initializeNestedWorkflows(WorkflowStepInstanceExecutionContext outerContext, List<WorkflowExecutionContext> nestedWorkflowContext) {
+ // wouldn't work if we iterated in the sub-workflow; but it doesn't allow an iterable target
+ outerContext.getWorkflowExectionContext().getAllInput().forEach( (k,v) -> {
+ if (!outerContext.hasInput(k)) {
+ nestedWorkflowContext.forEach((c -> c.getAllInput().put(k, v)));
+ }
+ });
+ }
+
+ @Override
+ protected Map<String, Object> getReducingWorkflowVarsFromLastStep(WorkflowStepInstanceExecutionContext outerContext, WorkflowExecutionContext lastRun, Map<String, Object> prevWorkflowVars) {
+ // wouldn't work if we iterated in the sub-workflow; but it doesn't allow an iterable target
+ lastRun.getAllInputResolved().forEach( (k,v) -> {
+ if (outerContext.getWorkflowExectionContext().hasInput(k)) outerContext.getWorkflowExectionContext().noteInputResolved(k, v);
+ });
+
+ outerContext.getWorkflowExectionContext().updateWorkflowScratchVariables(lastRun.getWorkflowScratchVariables());
+
+ // output should just be last step, not the reduced vars
+ return null;
+ }
+}
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
index 9c4bd30..dba25bf 100644
--- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
@@ -77,6 +77,7 @@
import org.apache.brooklyn.core.workflow.steps.flow.RetryWorkflowStep;
import org.apache.brooklyn.core.workflow.steps.flow.ReturnWorkflowStep;
import org.apache.brooklyn.core.workflow.steps.flow.SleepWorkflowStep;
+import org.apache.brooklyn.core.workflow.steps.flow.SubWorkflowStep;
import org.apache.brooklyn.core.workflow.steps.flow.SwitchWorkflowStep;
import org.apache.brooklyn.core.workflow.steps.variables.ClearVariableWorkflowStep;
import org.apache.brooklyn.core.workflow.steps.variables.LoadWorkflowStep;
@@ -155,6 +156,7 @@
addRegisteredTypeBean(mgmt, "retry", RetryWorkflowStep.class);
addRegisteredTypeBean(mgmt, "workflow", CustomWorkflowStep.class);
+ addRegisteredTypeBean(mgmt, "subworkflow", SubWorkflowStep.class);
addRegisteredTypeBean(mgmt, "foreach", ForeachWorkflowStep.class);
addRegisteredTypeBean(mgmt, "ssh", SshWorkflowStep.class);
addRegisteredTypeBean(mgmt, "shell", ShellWorkflowStep.class);
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
index 36667e9..20f59d5 100644
--- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
@@ -18,6 +18,12 @@
*/
package org.apache.brooklyn.core.workflow;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
import com.google.common.collect.Iterables;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
@@ -61,13 +67,6 @@
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CancellationException;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
public class WorkflowNestedAndCustomExtensionTest extends RebindTestFixture<TestApplication> {
private static final Logger log = LoggerFactory.getLogger(WorkflowNestedAndCustomExtensionTest.class);
@@ -174,8 +173,15 @@
Asserts.assertFailsWith(() -> invokeWorkflowStepsWithLogging(MutableList.of(
MutableMap.of("type", "log-hi",
- "steps", MutableList.of("return not allowed to override")))),
- e -> Asserts.expectedFailureContainsIgnoreCase(e, "steps"));
+ "steps", MutableList.of("return should have failed because not allowed to override")))),
+ Asserts.expectedFailureContainsIgnoreCase("error", "in definition", "step 1", "steps=", "should have failed"));
+
+ Asserts.assertFailsWith(() -> invokeWorkflowStepsWithLogging(MutableList.of(
+ "log-hi")),
+ Asserts.expectedFailureContainsIgnoreCase("evaluated to null or missing", "name").and(
+ Asserts.expectedFailureDoesNotContainIgnoreCase("recursive")
+ )
+ );
}
@Test
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSubAndCustomExtensionEdgeTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSubAndCustomExtensionEdgeTest.java
new file mode 100644
index 0000000..dc6fe61
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSubAndCustomExtensionEdgeTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.workflow;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.typereg.RegisteredType;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
+import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixture;
+import org.apache.brooklyn.core.resolve.jackson.BeanWithTypePlanTransformer;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.typereg.BasicTypeImplementationPlan;
+import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep;
+import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+public class WorkflowSubAndCustomExtensionEdgeTest extends RebindTestFixture<TestApplication> {
+
+ private static final Logger log = LoggerFactory.getLogger(WorkflowSubAndCustomExtensionEdgeTest.class);
+
+ @Override
+ protected LocalManagementContext decorateOrigOrNewManagementContext(LocalManagementContext mgmt) {
+ WorkflowBasicTest.addWorkflowStepTypes(mgmt);
+ app = null; // clear this
+ mgmt.getBrooklynProperties().put(WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT, "forever");
+ return super.decorateOrigOrNewManagementContext(mgmt);
+ }
+
+ @Override
+ protected TestApplication createApp() {
+ return null;
+ }
+
+ @Override protected TestApplication rebind() throws Exception {
+ return rebind(RebindOptions.create().terminateOrigManagementContext(true));
+ }
+
+ public RegisteredType addBeanWithType(String typeName, String version, String plan) {
+ return BrooklynAppUnitTestSupport.addRegisteredTypeBean(mgmt(), typeName, version,
+ new BasicTypeImplementationPlan(BeanWithTypePlanTransformer.FORMAT, plan));
+ }
+
+ TestApplication app;
+ Task<?> lastInvocation;
+
+ Object runWorkflow(List<Object> steps) throws Exception {
+ return runWorkflow(steps, null);
+ }
+ Object runWorkflow(List<Object> steps, ConfigBag extraEffectorConfig) throws Exception {
+ if (app==null) app = mgmt().getEntityManager().createEntity(EntitySpec.create(TestApplication.class));
+ WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance()
+ .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflow")
+ .configure(WorkflowEffector.STEPS, steps)
+ .putAll(extraEffectorConfig));
+ eff.apply((EntityLocal)app);
+
+ lastInvocation = app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), null);
+ return lastInvocation.getUnchecked();
+ }
+
+ @Test
+ public void testVisibilityOfReducingWhenDefiningCustomWorkflowStep() throws Exception {
+ // test the fixture is right
+ MutableMap<String, Serializable> basicReducingStep = MutableMap.of(
+ "type", "workflow",
+ "reducing", MutableMap.of("hello_word", "${hello_word}"),
+ "steps", MutableList.of("set-sensor hi = ${hello_word} world"));
+ runWorkflow(MutableList.of("let hello_word = HI", basicReducingStep));
+ EntityAsserts.assertAttributeEquals(app, Sensors.newSensor(String.class, "hi"), "HI world");
+
+ addBeanWithType("set-sensor-hi-reducing", "1-SNAPSHOT", Strings.lines(
+ "type: workflow",
+ "parameters:",
+ " value: {}",
+ "reducing:",
+ " hello_word: ${hello_word}",
+ "steps:",
+ " - let hi_word = ${hello_word} ?? hi",
+ " - set-sensor hi = ${hi_word} ${value}",
+ " - let hello_word = bye"
+ ));
+
+ if (CustomWorkflowStep.CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE) {
+ runWorkflow(MutableList.of(
+ "let hello_word = HELLO",
+ MutableMap.of("type", "set-sensor-hi-reducing", "input", MutableMap.of("value", "bob")),
+ "return ${hello_word}"));
+ Asserts.assertEquals(lastInvocation.getUnchecked(), "bye");
+ EntityAsserts.assertAttributeEquals(app, Sensors.newSensor(String.class, "hi"), "HELLO bob");
+ } else {
+ Asserts.assertFailsWith(() -> runWorkflow(MutableList.of(
+ "let hello_word = HELLO",
+ MutableMap.of("type", "set-sensor-hi-reducing", "input", MutableMap.of("value", "bob")),
+ "return ${hello_word}")),
+ Asserts.expectedFailureContainsIgnoreCase("not permitted", "reducing"));
+ }
+
+ addBeanWithType("set-sensor-hi", "1-SNAPSHOT", Strings.lines(
+ "type: workflow",
+ "parameters:",
+ " value: {}",
+ "steps:",
+ " - set-sensor hi = hi ${value}"
+ ));
+ // value is a poor choice with set-sensor, because set-sensor tries to evaluate the input; but let's make sure the message is not too confusing
+ Asserts.assertFailsWith(() -> runWorkflow(MutableList.of(
+ "let hello_word = HI",
+ "set-sensor-hi")),
+ Asserts.expectedFailureContainsIgnoreCase("recursive or missing reference","value")
+ );
+ }
+
+ @Test
+ public void testSubWorkflowStep() throws Exception {
+ runWorkflow(MutableList.of(
+ "let v1 = V1",
+ "let v2 = V2",
+ MutableMap.of("step", "subworkflow",
+ "steps", MutableList.of(
+ "let v0 = ${v0}B",
+ "let v1 = ${v1}B",
+ "let v3 = V3B",
+ "return done")),
+ "return ${v0}-${v1}-${v2}-${v3}-${output}"),
+ ConfigBag.newInstance().configure(WorkflowCommonConfig.INPUT, MutableMap.of("v0", "V0")) );
+ Asserts.assertEquals(lastInvocation.getUnchecked(), "V0B-V1B-V2-V3B-done");
+ }
+
+}
diff --git a/karaf/init/src/main/resources/catalog.bom b/karaf/init/src/main/resources/catalog.bom
index ba10136..c606ff1 100644
--- a/karaf/init/src/main/resources/catalog.bom
+++ b/karaf/init/src/main/resources/catalog.bom
@@ -184,6 +184,11 @@
itemType: bean
item:
type: org.apache.brooklyn.core.workflow.steps.flow.ForeachWorkflowStep
+ - id: subworkflow
+ format: java-type-name
+ itemType: bean
+ item:
+ type: org.apache.brooklyn.core.workflow.steps.flow.SubWorkflowStep
- id: retry
format: java-type-name
itemType: bean
diff --git a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
index fdfe11a..fedcd94 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
@@ -1239,6 +1239,18 @@
}
}
}
+
+ public static void assertStringDoesNotContainIgnoreCase(String input, String phrase1ToNotContain, String ...optionalOtherPhrasesToNotContain) {
+ if (input==null) fail("Input is null.");
+ if (phrase1ToNotContain!=null) {
+ assertThat(input, Predicates.not(StringPredicates.containsLiteralIgnoreCase(phrase1ToNotContain)));
+ }
+ for (String otherPhrase: optionalOtherPhrasesToNotContain) {
+ if (otherPhrase!=null) {
+ assertThat(input, Predicates.not(StringPredicates.containsLiteralIgnoreCase(otherPhrase)));
+ }
+ }
+ }
public static void assertStringMatchesRegex(String input, String regex1ToMatch, String ...optionalOtherRegexesToMatch) {
if (input==null) fail("Input is null.");
@@ -1369,7 +1381,22 @@
}
return true;
}
-
+ public static boolean expectedFailureDoesNotContainIgnoreCase(Throwable e, String phrase1ToNotContain, String ...optionalOtherPhrasesToNotContain) {
+ if (e instanceof ShouldHaveFailedPreviouslyAssertionError) throw (Error)e;
+ try {
+ assertStringDoesNotContainIgnoreCase(Exceptions.collapseText(e), phrase1ToNotContain, optionalOtherPhrasesToNotContain);
+ } catch (AssertionError ee) {
+ rethrowPreferredException(e, ee);
+ }
+ return true;
+ }
+ public static Predicate<Throwable> expectedFailureDoesNotContain( String phrase1ToNotContain, String ...optionalOtherPhrasesToNotContain) {
+ return e -> expectedFailureDoesNotContain(e, phrase1ToNotContain, optionalOtherPhrasesToNotContain);
+ }
+ public static Predicate<Throwable> expectedFailureDoesNotContainIgnoreCase(String phrase1ToNotContain, String ...optionalOtherPhrasesToNotContain) {
+ return e -> expectedFailureDoesNotContainIgnoreCase(e, phrase1ToNotContain, optionalOtherPhrasesToNotContain);
+ }
+
/** Implements the return behavior for {@link #expectedFailureOfType(Throwable, Class, Class...)} and others,
* to log interesting earlier errors but to suppress those which are internal or redundant. */
private static void rethrowPreferredException(Throwable earlierPreferredIfFatalElseLogged, Throwable laterPreferredOtherwise) throws AssertionError {