add a subworkflow step which shares vars with parents

and stricter+better error messages if reducing, targets etc used on custom workflow steps where they shouldn't be
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 87ea371..f154d92 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -148,6 +148,24 @@
     @JsonIgnore  // persist as sensor but not via REST in case it has secrets resolved
     Map<String,Object> inputResolved = MutableMap.of();
 
+    public boolean hasInput(ConfigKey<?> key) {
+        return hasInput(key.getName());
+    }
+    public boolean hasInput(String key) {
+        return input.containsKey(key);
+    }
+    @JsonIgnore
+    public Map<String, Object> getAllInput() {
+        return input;
+    }
+    @JsonIgnore
+    public Map<String, Object> getAllInputResolved() {
+        return inputResolved;
+    }
+    public void noteInputResolved(String k, Object v) {
+        inputResolved.put(k, v);
+    }
+
     Object outputDefinition;
     /** final output of the workflow, set at end */
     Object output;
@@ -657,7 +675,7 @@
         if (vm.isPresent()) {
             if (WorkflowStepInstanceExecutionContext.REMEMBER_RESOLVED_INPUT) {
                 // this will keep spending time resolving, but will resolve the resolved value
-                inputResolved.put(key, vm.get());
+                noteInputResolved(key, vm.get());
             }
         }
         return vm;
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
index dd16038..ab4836f 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
@@ -20,6 +20,7 @@
 
 import com.google.common.annotations.Beta;
 import com.google.common.reflect.TypeToken;
+import freemarker.core.InvalidReferenceException;
 import freemarker.template.TemplateHashModel;
 import freemarker.template.TemplateModel;
 import freemarker.template.TemplateModelException;
@@ -216,8 +217,6 @@
                         // the main exception, handled here, is if we are setting an input
                         candidate = null;
                         errors.add(t);
-                    } else {
-                        throw Exceptions.propagate(t);
                     }
                 }
                 if (candidate != null) return TemplateProcessor.wrapAsTemplateModel(candidate);
@@ -495,7 +494,7 @@
             entry = WorkflowVariableResolutionStackEntry.setting(context, stage, variable);
             if (!RESOLVE_STACK.push(entry)) {
                 entry = null;
-                throw new WorkflowVariableRecursiveReference("Recursive reference setting "+variable+": "+RESOLVE_STACK.getAll(false).stream().map(p -> p.object!=null ? p.object.toString() : p.settingVariable).collect(Collectors.joining("->")));
+                throw new WorkflowVariableRecursiveReference("Recursive or missing reference setting "+variable+": "+RESOLVE_STACK.getAll(false).stream().map(p -> p.object!=null ? p.object.toString() : p.settingVariable).collect(Collectors.joining("->")));
             }
 
             return callable.get();
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
index 704904f..db8543f 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
@@ -69,6 +69,15 @@
     @JsonIgnore  // persist as sensor but not via REST in case it has secrets resolved
     Map<String,Object> inputResolved = MutableMap.of();
 
+    @JsonIgnore
+    public Map<String, Object> getAllInput() {
+        return input;
+    }
+    @JsonIgnore
+    public Map<String, Object> getAllInputResolved() {
+        return inputResolved;
+    }
+
     transient WorkflowExecutionContext context;
     // replay instructions or a string explicit next step identifier
     public Object next;
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
index e56d38e..55681a7 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
@@ -43,12 +43,14 @@
 import org.apache.brooklyn.core.workflow.utils.WorkflowRetentionParser;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.javalang.Reflections;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
@@ -56,9 +58,11 @@
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -76,6 +80,17 @@
 
     private static final String WORKFLOW_SETTING_SHORTHAND = "[ \"replayable\" ${replayable...} ] [ \"retention\" ${retention...} ] ";
 
+    /* fields which are only permitted in the registered type definition */
+    protected static final Set<String> FORBIDDEN_IN_WORKFLOW_STEP = MutableSet.copyOf(Arrays.asList(WorkflowCommonConfig.PARAMETER_DEFS)
+            .stream().map(ConfigKey::getName).collect(Collectors.toSet())).asUnmodifiable();
+
+    protected static final Set<String> FORBIDDEN_IN_WORKFLOW_STEP_IN_SUBCLASSES = MutableSet.copyOf(Arrays.asList(WorkflowCommonConfig.STEPS)
+            .stream().map(ConfigKey::getName).collect(Collectors.toSet())).put("target").asUnmodifiable();
+
+    public static final boolean CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE = false;
+    protected static final Set<String> FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS =
+            (CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE ? MutableSet.<String>of() : MutableSet.of("reducing")).asUnmodifiable();
+
     public CustomWorkflowStep() {}
     public CustomWorkflowStep(String name, List<Object> steps) {
         this.name = name;
@@ -245,7 +260,7 @@
 
         targetR = checkTarget(targetR);
 
-        Map reducingV = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, reducing, Map.class);
+        Map reducingV = initializeReducingVariables(context, reducing);
 
         AtomicInteger index = new AtomicInteger(0);
         ((Iterable<?>) targetR).forEach(t -> {
@@ -263,6 +278,8 @@
             }
         });
 
+        initializeNestedWorkflows(context, nestedWorkflowContext);
+
         StepState state = getStepState(context);
         state.wasList = wasList;
         state.reducing = reducingV;
@@ -407,7 +424,7 @@
             assert submitted.isEmpty();
             if (lastWorkflowRunBeforeReplay!=null) {
                 // if interrupted we need to explicitly take from the last step
-                reducingV = updateReducingWorkflowVarsFromLastStep(lastWorkflowRunBeforeReplay, reducingV);
+                reducingV = getReducingWorkflowVarsFromLastStep(context, lastWorkflowRunBeforeReplay, reducingV);
             }
             for (Pair<WorkflowExecutionContext, Task<?>> p : delayedBecauseReducing) {
                 WorkflowExecutionContext wc = p.getLeft();
@@ -418,7 +435,7 @@
 
                 DynamicTasks.queue(p.getRight()).getUnchecked();
 
-                reducingV = updateReducingWorkflowVarsFromLastStep(wc, reducingV);
+                reducingV = getReducingWorkflowVarsFromLastStep(context, wc, reducingV);
             }
         }
 
@@ -433,16 +450,22 @@
             returnValue = !wasList ? Iterables.getOnlyElement(result) : result;
         } else {
             context.setOutput(reducingV);
-            context.getWorkflowExectionContext().updateWorkflowScratchVariables(reducingV);
             returnValue = reducingV;
         }
 
         return returnValue;
     }
 
-    private static MutableMap<String, Object> updateReducingWorkflowVarsFromLastStep(WorkflowExecutionContext lastStep, Map<String, Object> prevWorkflowVars) {
-        Map<String, Object> lastStepReducingWorkflowVars = lastStep.getWorkflowScratchVariables();
-        Object lastStepOutput = lastStep.getOutput();
+    protected Map initializeReducingVariables(WorkflowStepInstanceExecutionContext context, Map<String, Object> reducing) {
+        return context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, reducing, Map.class);
+    }
+
+    protected void initializeNestedWorkflows(WorkflowStepInstanceExecutionContext outerContext, List<WorkflowExecutionContext> nestedWorkflowContext) {
+    }
+
+    protected Map<String, Object> getReducingWorkflowVarsFromLastStep(WorkflowStepInstanceExecutionContext outerContext, WorkflowExecutionContext lastRun, Map<String, Object> prevWorkflowVars) {
+        Map<String, Object> lastStepReducingWorkflowVars = lastRun.getWorkflowScratchVariables();
+        Object lastStepOutput = lastRun.getOutput();
         MutableMap<String, Object> result = MutableMap.copyOf(prevWorkflowVars);
         prevWorkflowVars.keySet().forEach(k -> {
             result.put(k, lastStepReducingWorkflowVars.get(k));
@@ -450,6 +473,11 @@
                 result.put(k, ((Map) lastStepOutput).get(k));
             }
         });
+
+        // we return them as output; we also set them as scratch, which might not be necessary / ideal,
+        // but it's what we've done for a while, so don't break that, yet
+        outerContext.getWorkflowExectionContext().updateWorkflowScratchVariables(result);
+
         return result;
     }
 
@@ -486,7 +514,7 @@
 
     @Override
     public WorkflowStepDefinition applySpecialDefinition(ManagementContext mgmt, Object definition, String typeBestGuess, WorkflowStepDefinitionWithSpecialDeserialization firstParse) {
-        // if we've resolved a custom workflow step, we need to make sure that the map supplied here
+        // if we've resolved a custom workflow step, we need to make sure that the map supplied as part of the workflow
         // - doesn't set parameters
         // - doesn't set steps unless it is a simple `workflow` step (not a custom step)
         // - (also caller must not override shorthand definition, but that is explicitly removed by WorkflowStepResolution)
@@ -497,19 +525,8 @@
         }
         CustomWorkflowStep result = (CustomWorkflowStep) firstParse;
         Map m = (Map)definition;
-        for (String forbiddenKey: new String[] { "parameters" }) {
-            if (m.containsKey(forbiddenKey)) {
-                throw new IllegalArgumentException("Not permitted to override '" + forbiddenKey + "' when using a workflow step");
-            }
-        }
-        if (!isPermittedToSetSteps(typeBestGuess)) {
-            // custom workflow step
-            for (String forbiddenKey : new String[]{"steps"}) {
-                if (m.containsKey(forbiddenKey)) {
-                    throw new IllegalArgumentException("Not permitted to override '" + forbiddenKey + "' when using a custom workflow step");
-                }
-            }
-        }
+        checkCallerSuppliedDefinition(typeBestGuess, m);
+
         if (m.containsKey("output")) {
             // need to restore the workflow output from the base definition
             try {
@@ -526,8 +543,34 @@
         return result;
     }
 
-    protected boolean isPermittedToSetSteps(String typeBestGuess) {
-        return typeBestGuess==null || SHORTHAND_TYPE_NAME_DEFAULT.equals(typeBestGuess) || CustomWorkflowStep.class.getName().equals(typeBestGuess);
+    protected void checkCallerSuppliedDefinition(String typeBestGuess, Map m) {
+        // caller (workflow author) cannot set parameters, that makes no sense
+        FORBIDDEN_IN_WORKFLOW_STEP.stream().filter(m::containsKey).forEach(forbiddenKey -> {
+            throw new IllegalArgumentException("Not permitted to override '" + forbiddenKey + "' when using a workflow step");
+        });
+
+        if (!CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE && !isInternalClassNotExtendedAndUserAllowedToSetMostThings(typeBestGuess)) {
+            // caller can't specify these
+            FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS.stream().filter(m::containsKey).forEach(forbiddenKey -> {
+                throw new IllegalArgumentException("Not permitted to set '" + forbiddenKey + "' when using a custom workflow step");
+            });
+            // neither should the custom registered type itself!
+            FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS.stream().filter(k -> (Reflections.getFieldValueMaybe(this, k).isPresentAndNonNull())).forEach(forbiddenKey -> {
+                throw new IllegalArgumentException("Not permitted for a custom workflow step to use '" + forbiddenKey + "'");
+            });
+        }
+        if (!isInternalClassNotExtendedAndUserAllowedToSetMostThings(typeBestGuess)) {
+            FORBIDDEN_IN_WORKFLOW_STEP_IN_SUBCLASSES.stream().filter(m::containsKey).forEach(forbiddenKey -> {
+                throw new IllegalArgumentException("Not permitted to override '" + forbiddenKey + "' when using a custom workflow step");
+            });
+        }
+    }
+
+    protected boolean isInternalClassNotExtendedAndUserAllowedToSetMostThings(String typeBestGuess) {
+        return !isRegisteredTypeExtensionToClass(CustomWorkflowStep.class, SHORTHAND_TYPE_NAME_DEFAULT, typeBestGuess);
+    }
+    protected boolean isRegisteredTypeExtensionToClass(Class<? extends CustomWorkflowStep> clazz, String shorthandDefault, String typeBestGuess) {
+        return typeBestGuess!=null && !shorthandDefault.equals(typeBestGuess) && !clazz.getName().equals(typeBestGuess);
     }
 
     protected WorkflowExecutionContext newWorkflow(WorkflowStepInstanceExecutionContext context, Object target, Integer targetIndexOrNull) {
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
index 3c179bc..e4a511c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
@@ -55,8 +55,8 @@
         throw new IllegalArgumentException("Target of foreach must be a list or an expression that resolves to a list, not "+targetR);
     }
 
-    protected boolean isPermittedToSetSteps(String typeBestGuess) {
-        return typeBestGuess==null || SHORTHAND_TYPE_NAME_DEFAULT.equals(typeBestGuess) || ForeachWorkflowStep.class.getName().equals(typeBestGuess);
+    protected boolean isInternalClassNotExtendedAndUserAllowedToSetMostThings(String typeBestGuess) {
+        return !isRegisteredTypeExtensionToClass(ForeachWorkflowStep.class, SHORTHAND_TYPE_NAME_DEFAULT, typeBestGuess);
     }
 
     protected void initializeSubWorkflowForTarget(WorkflowStepInstanceExecutionContext context, Object target, WorkflowExecutionContext nestedWorkflowContext) {
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java
new file mode 100644
index 0000000..0e09785
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.workflow.steps.flow;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.workflow.WorkflowCommonConfig;
+import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
+import org.apache.brooklyn.core.workflow.WorkflowExpressionResolution;
+import org.apache.brooklyn.core.workflow.WorkflowStepDefinition;
+import org.apache.brooklyn.core.workflow.WorkflowStepInstanceExecutionContext;
+import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.javalang.Reflections;
+
+public class SubWorkflowStep extends CustomWorkflowStep {
+
+    public static final String SHORTHAND_TYPE_NAME_DEFAULT = "subworkflow";
+
+    protected static final Set<String> FORBIDDEN_IN_SUBWORKFLOW_STEP_ALWAYS = MutableSet.copyOf(FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS)
+            .putAll(MutableSet.of("target", "concurrency")).asUnmodifiable();
+
+    public SubWorkflowStep() {}
+
+    public SubWorkflowStep(CustomWorkflowStep base) {
+        super(base);
+    }
+
+    protected boolean isInternalClassNotExtendedAndUserAllowedToSetMostThings(String typeBestGuess) {
+        return !isRegisteredTypeExtensionToClass(SubWorkflowStep.class, SHORTHAND_TYPE_NAME_DEFAULT, typeBestGuess);
+    }
+
+    protected void checkCallerSuppliedDefinition(String typeBestGuess, Map m) {
+        if (!isInternalClassNotExtendedAndUserAllowedToSetMostThings(typeBestGuess)) {
+            throw new IllegalArgumentException("Not permitted to define a custom subworkflow step");
+        }
+        // these can't be set by user or registered type for subworkflow
+        FORBIDDEN_IN_SUBWORKFLOW_STEP_ALWAYS.stream().filter(m::containsKey).forEach(forbiddenKey -> {
+            throw new IllegalArgumentException("Not permitted to set '" + forbiddenKey + "' when using a subworkflow step");
+        });
+        FORBIDDEN_IN_SUBWORKFLOW_STEP_ALWAYS.stream().filter(k -> (Reflections.getFieldValueMaybe(this, k).isPresentAndNonNull())).forEach(forbiddenKey -> {
+            throw new IllegalArgumentException("Not permitted for a subworkflow step to use '" + forbiddenKey + "'");
+        });
+    }
+
+    @Override
+    protected Map initializeReducingVariables(WorkflowStepInstanceExecutionContext context, Map<String, Object> reducing) {
+        return super.initializeReducingVariables(context, context.getWorkflowExectionContext().getWorkflowScratchVariables());
+    }
+
+    @Override
+    protected void initializeNestedWorkflows(WorkflowStepInstanceExecutionContext outerContext, List<WorkflowExecutionContext> nestedWorkflowContext) {
+        // wouldn't work if we iterated in the sub-workflow; but it doesn't allow an iterable target
+        outerContext.getWorkflowExectionContext().getAllInput().forEach( (k,v) -> {
+            if (!outerContext.hasInput(k)) {
+                nestedWorkflowContext.forEach((c -> c.getAllInput().put(k, v)));
+            }
+        });
+    }
+
+    @Override
+    protected Map<String, Object> getReducingWorkflowVarsFromLastStep(WorkflowStepInstanceExecutionContext outerContext, WorkflowExecutionContext lastRun, Map<String, Object> prevWorkflowVars) {
+        // wouldn't work if we iterated in the sub-workflow; but it doesn't allow an iterable target
+        lastRun.getAllInputResolved().forEach( (k,v) -> {
+            if (outerContext.getWorkflowExectionContext().hasInput(k)) outerContext.getWorkflowExectionContext().noteInputResolved(k, v);
+        });
+
+        outerContext.getWorkflowExectionContext().updateWorkflowScratchVariables(lastRun.getWorkflowScratchVariables());
+
+        // output should just be last step, not the reduced vars
+        return null;
+    }
+}
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
index 9c4bd30..dba25bf 100644
--- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
@@ -77,6 +77,7 @@
 import org.apache.brooklyn.core.workflow.steps.flow.RetryWorkflowStep;
 import org.apache.brooklyn.core.workflow.steps.flow.ReturnWorkflowStep;
 import org.apache.brooklyn.core.workflow.steps.flow.SleepWorkflowStep;
+import org.apache.brooklyn.core.workflow.steps.flow.SubWorkflowStep;
 import org.apache.brooklyn.core.workflow.steps.flow.SwitchWorkflowStep;
 import org.apache.brooklyn.core.workflow.steps.variables.ClearVariableWorkflowStep;
 import org.apache.brooklyn.core.workflow.steps.variables.LoadWorkflowStep;
@@ -155,6 +156,7 @@
 
         addRegisteredTypeBean(mgmt, "retry", RetryWorkflowStep.class);
         addRegisteredTypeBean(mgmt, "workflow", CustomWorkflowStep.class);
+        addRegisteredTypeBean(mgmt, "subworkflow", SubWorkflowStep.class);
         addRegisteredTypeBean(mgmt, "foreach", ForeachWorkflowStep.class);
         addRegisteredTypeBean(mgmt, "ssh", SshWorkflowStep.class);
         addRegisteredTypeBean(mgmt, "shell", ShellWorkflowStep.class);
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
index 36667e9..20f59d5 100644
--- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
@@ -18,6 +18,12 @@
  */
 package org.apache.brooklyn.core.workflow;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
 import com.google.common.collect.Iterables;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
@@ -61,13 +67,6 @@
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CancellationException;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
 public class WorkflowNestedAndCustomExtensionTest extends RebindTestFixture<TestApplication> {
 
     private static final Logger log = LoggerFactory.getLogger(WorkflowNestedAndCustomExtensionTest.class);
@@ -174,8 +173,15 @@
 
         Asserts.assertFailsWith(() -> invokeWorkflowStepsWithLogging(MutableList.of(
                         MutableMap.of("type", "log-hi",
-                                "steps", MutableList.of("return not allowed to override")))),
-                e -> Asserts.expectedFailureContainsIgnoreCase(e, "steps"));
+                                "steps", MutableList.of("return should have failed because not allowed to override")))),
+                Asserts.expectedFailureContainsIgnoreCase("error", "in definition", "step 1", "steps=", "should have failed"));
+
+        Asserts.assertFailsWith(() -> invokeWorkflowStepsWithLogging(MutableList.of(
+                        "log-hi")),
+                Asserts.expectedFailureContainsIgnoreCase("evaluated to null or missing", "name").and(
+                        Asserts.expectedFailureDoesNotContainIgnoreCase("recursive")
+                )
+        );
     }
 
     @Test
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSubAndCustomExtensionEdgeTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSubAndCustomExtensionEdgeTest.java
new file mode 100644
index 0000000..dc6fe61
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSubAndCustomExtensionEdgeTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.workflow;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.typereg.RegisteredType;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
+import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixture;
+import org.apache.brooklyn.core.resolve.jackson.BeanWithTypePlanTransformer;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.typereg.BasicTypeImplementationPlan;
+import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep;
+import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+public class WorkflowSubAndCustomExtensionEdgeTest extends RebindTestFixture<TestApplication> {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkflowSubAndCustomExtensionEdgeTest.class);
+
+    @Override
+    protected LocalManagementContext decorateOrigOrNewManagementContext(LocalManagementContext mgmt) {
+        WorkflowBasicTest.addWorkflowStepTypes(mgmt);
+        app = null; // clear this
+        mgmt.getBrooklynProperties().put(WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT, "forever");
+        return super.decorateOrigOrNewManagementContext(mgmt);
+    }
+
+    @Override
+    protected TestApplication createApp() {
+        return null;
+    }
+
+    @Override protected TestApplication rebind() throws Exception {
+        return rebind(RebindOptions.create().terminateOrigManagementContext(true));
+    }
+
+    public RegisteredType addBeanWithType(String typeName, String version, String plan) {
+        return BrooklynAppUnitTestSupport.addRegisteredTypeBean(mgmt(), typeName, version,
+                new BasicTypeImplementationPlan(BeanWithTypePlanTransformer.FORMAT, plan));
+    }
+
+    TestApplication app;
+    Task<?> lastInvocation;
+
+    Object runWorkflow(List<Object> steps) throws Exception {
+        return runWorkflow(steps, null);
+    }
+    Object runWorkflow(List<Object> steps, ConfigBag extraEffectorConfig) throws Exception {
+        if (app==null) app = mgmt().getEntityManager().createEntity(EntitySpec.create(TestApplication.class));
+        WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance()
+                .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflow")
+                .configure(WorkflowEffector.STEPS, steps)
+                .putAll(extraEffectorConfig));
+        eff.apply((EntityLocal)app);
+
+        lastInvocation = app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), null);
+        return lastInvocation.getUnchecked();
+    }
+
+    @Test
+    public void testVisibilityOfReducingWhenDefiningCustomWorkflowStep() throws Exception {
+        // test the fixture is right
+        MutableMap<String, Serializable> basicReducingStep = MutableMap.of(
+                "type", "workflow",
+                "reducing", MutableMap.of("hello_word", "${hello_word}"),
+                "steps", MutableList.of("set-sensor hi = ${hello_word} world"));
+        runWorkflow(MutableList.of("let hello_word = HI", basicReducingStep));
+        EntityAsserts.assertAttributeEquals(app, Sensors.newSensor(String.class, "hi"), "HI world");
+
+        addBeanWithType("set-sensor-hi-reducing", "1-SNAPSHOT", Strings.lines(
+                "type: workflow",
+                "parameters:",
+                "  value: {}",
+                "reducing:",
+                "  hello_word: ${hello_word}",
+                "steps:",
+                "  - let hi_word = ${hello_word} ?? hi",
+                "  - set-sensor hi = ${hi_word} ${value}",
+                "  - let hello_word = bye"
+        ));
+
+        if (CustomWorkflowStep.CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE) {
+            runWorkflow(MutableList.of(
+                    "let hello_word = HELLO",
+                    MutableMap.of("type", "set-sensor-hi-reducing", "input", MutableMap.of("value", "bob")),
+                    "return ${hello_word}"));
+            Asserts.assertEquals(lastInvocation.getUnchecked(), "bye");
+            EntityAsserts.assertAttributeEquals(app, Sensors.newSensor(String.class, "hi"), "HELLO bob");
+        } else {
+            Asserts.assertFailsWith(() -> runWorkflow(MutableList.of(
+                    "let hello_word = HELLO",
+                    MutableMap.of("type", "set-sensor-hi-reducing", "input", MutableMap.of("value", "bob")),
+                    "return ${hello_word}")),
+                Asserts.expectedFailureContainsIgnoreCase("not permitted", "reducing"));
+        }
+
+        addBeanWithType("set-sensor-hi", "1-SNAPSHOT", Strings.lines(
+                "type: workflow",
+                "parameters:",
+                "  value: {}",
+                "steps:",
+                "  - set-sensor hi = hi ${value}"
+        ));
+        // value is a poor choice with set-sensor, because set-sensor tries to evaluate the input; but let's make sure the message is not too confusing
+        Asserts.assertFailsWith(() -> runWorkflow(MutableList.of(
+                "let hello_word = HI",
+                "set-sensor-hi")),
+                Asserts.expectedFailureContainsIgnoreCase("recursive or missing reference","value")
+        );
+    }
+
+    @Test
+    public void testSubWorkflowStep() throws Exception {
+        runWorkflow(MutableList.of(
+                "let v1 = V1",
+                "let v2 = V2",
+                MutableMap.of("step", "subworkflow",
+                    "steps", MutableList.of(
+                            "let v0 = ${v0}B",
+                            "let v1 = ${v1}B",
+                            "let v3 = V3B",
+                            "return done")),
+                "return ${v0}-${v1}-${v2}-${v3}-${output}"),
+                ConfigBag.newInstance().configure(WorkflowCommonConfig.INPUT, MutableMap.of("v0", "V0")) );
+        Asserts.assertEquals(lastInvocation.getUnchecked(), "V0B-V1B-V2-V3B-done");
+    }
+
+}
diff --git a/karaf/init/src/main/resources/catalog.bom b/karaf/init/src/main/resources/catalog.bom
index ba10136..c606ff1 100644
--- a/karaf/init/src/main/resources/catalog.bom
+++ b/karaf/init/src/main/resources/catalog.bom
@@ -184,6 +184,11 @@
     itemType: bean
     item:
       type: org.apache.brooklyn.core.workflow.steps.flow.ForeachWorkflowStep
+  - id: subworkflow
+    format: java-type-name
+    itemType: bean
+    item:
+      type: org.apache.brooklyn.core.workflow.steps.flow.SubWorkflowStep
   - id: retry
     format: java-type-name
     itemType: bean
diff --git a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
index fdfe11a..fedcd94 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
@@ -1239,6 +1239,18 @@
             }
         }
     }
+
+    public static void assertStringDoesNotContainIgnoreCase(String input, String phrase1ToNotContain, String ...optionalOtherPhrasesToNotContain) {
+        if (input==null) fail("Input is null.");
+        if (phrase1ToNotContain!=null) {
+            assertThat(input, Predicates.not(StringPredicates.containsLiteralIgnoreCase(phrase1ToNotContain)));
+        }
+        for (String otherPhrase: optionalOtherPhrasesToNotContain) {
+            if (otherPhrase!=null) {
+                assertThat(input, Predicates.not(StringPredicates.containsLiteralIgnoreCase(otherPhrase)));
+            }
+        }
+    }
     
     public static void assertStringMatchesRegex(String input, String regex1ToMatch, String ...optionalOtherRegexesToMatch) {
         if (input==null) fail("Input is null.");
@@ -1369,7 +1381,22 @@
         }
         return true;
     }
-    
+    public static boolean expectedFailureDoesNotContainIgnoreCase(Throwable e, String phrase1ToNotContain, String ...optionalOtherPhrasesToNotContain) {
+        if (e instanceof ShouldHaveFailedPreviouslyAssertionError) throw (Error)e;
+        try {
+            assertStringDoesNotContainIgnoreCase(Exceptions.collapseText(e), phrase1ToNotContain, optionalOtherPhrasesToNotContain);
+        } catch (AssertionError ee) {
+            rethrowPreferredException(e, ee);
+        }
+        return true;
+    }
+    public static Predicate<Throwable> expectedFailureDoesNotContain( String phrase1ToNotContain, String ...optionalOtherPhrasesToNotContain) {
+        return e -> expectedFailureDoesNotContain(e, phrase1ToNotContain, optionalOtherPhrasesToNotContain);
+    }
+    public static Predicate<Throwable> expectedFailureDoesNotContainIgnoreCase(String phrase1ToNotContain, String ...optionalOtherPhrasesToNotContain) {
+        return e -> expectedFailureDoesNotContainIgnoreCase(e, phrase1ToNotContain, optionalOtherPhrasesToNotContain);
+    }
+
     /** Implements the return behavior for {@link #expectedFailureOfType(Throwable, Class, Class...)} and others,
      * to log interesting earlier errors but to suppress those which are internal or redundant. */
     private static void rethrowPreferredException(Throwable earlierPreferredIfFatalElseLogged, Throwable laterPreferredOtherwise) throws AssertionError {