Merge branch 'workflow-size-optimizations'
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 14e69e0..26fc66d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -18,10 +18,10 @@
  */
 package org.apache.brooklyn.core.workflow;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.*;
+import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.google.common.collect.Iterables;
 import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.api.entity.Entity;
@@ -81,6 +81,7 @@
 import static org.apache.brooklyn.core.workflow.WorkflowReplayUtils.ReplayResumeDepthCheck.RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT;
 
 @JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonDeserialize(converter = WorkflowExecutionContext.Converter.class)
 public class WorkflowExecutionContext {
 
     private static final Logger log = LoggerFactory.getLogger(WorkflowExecutionContext.class);
@@ -145,8 +146,33 @@
     Map<String,Object> inputResolved = MutableMap.of();
 
     Object outputDefinition;
+    /** final output of the workflow, set at end */
     Object output;
 
+    /*
+     * Tricks to keep size of persisted/serialized data down:
+     *
+     * * this.output set at the end (as a copy)
+     *   NEW: only set if different, on lookup if null, if finished, get from last step
+     * * step.output set on completion of each step (copying previous if no explicit output)
+//     *   - if never run
+//     *     - if different to previous step, set
+//     *     - if same as previous step, set null
+//     *   - if run before
+//     *     - if old value equals new value, do nothing
+//     *     - if old value different
+     *       - if new value is same as previous step, set null, else set new value
+     *       - & if last run's next step output was null, set it to the old value here
+     *   NEW: only set if different to previous step or not recoverable
+     *   - if different to previous step, set
+     *   - if there was a previous instance of this step: if we are different to previous instance of this step, set
+     *   - if same. has this step
+     *
+     * * this.workflowScratch - currently set on context dynamically, copied to oldStepInfo
+     *   NEW 1: set on context and oldStepInfo dynamically but null if same as previous, retrieved looking up previous
+     *   NEW 2: add up all the previous until it repeats
+     */
+
     Object lock;
 
     Duration timeout;
@@ -194,8 +220,10 @@
         WorkflowStepInstanceExecutionContext context;
         /** is step replayable */
         Boolean replayableFromHere;
-        /** scratch for last _started_ instance of step */
+        /** scratch vars as at start of last invocation of set _if_ they could not be derived from updates */
         Map<String,Object> workflowScratch;
+        /** updates to scratch vars made by the last run of this step */
+        Map<String,Object> workflowScratchUpdates;
         /** steps that immediately preceded this, updated when _this_ step started, with most recent first */
         Set<Integer> previous;
         /** steps that immediately followed this, updated when _next_ step started, with most recent first */
@@ -205,7 +233,14 @@
         String nextTaskId;
     }
 
-    Map<String,Object> workflowScratchVariables = MutableMap.of();
+    // when persisted, this is omitted and restored from the oldStepInfo map on read
+    transient Map<String,Object> workflowScratchVariables;
+    transient Map<String,Object> workflowScratchVariablesUpdatedThisStep;
+
+    @JsonSetter("workflowScratchVariables") //only used to read in old state which stored this
+    public void setWorkflowScratchVariablesToDeserializeOld(Map<String, Object> workflowScratchVariables) {
+        this.workflowScratchVariables = workflowScratchVariables;
+    }
 
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     Map<String,List<Instant>> retryRecords = MutableMap.of();
@@ -366,8 +401,31 @@
         return parentTag;
     }
 
+    @JsonIgnore
     public Map<String, Object> getWorkflowScratchVariables() {
-        return workflowScratchVariables;
+        if (workflowScratchVariables == null) {
+            Pair<Map<String, Object>, Set<Integer>> prev = getStepWorkflowScratchAndBacktrackedSteps(null);
+            workflowScratchVariables = prev.getLeft();
+        }
+        return MutableMap.copyOf(workflowScratchVariables).asUnmodifiable();
+    }
+
+    public Object updateWorkflowScratchVariable(String s, Object v) {
+        if (workflowScratchVariables ==null) getWorkflowScratchVariables();
+        Object old = workflowScratchVariables.put(s, v);
+        if (v==null) workflowScratchVariables.remove(s);
+        if (workflowScratchVariablesUpdatedThisStep==null) workflowScratchVariablesUpdatedThisStep = MutableMap.of();
+        workflowScratchVariablesUpdatedThisStep.put(s, v);
+        return old;
+    }
+
+    public void updateWorkflowScratchVariables(Map<String,Object> newValues) {
+        if (newValues!=null && !newValues.isEmpty()) {
+            if (workflowScratchVariables ==null) getWorkflowScratchVariables();
+            workflowScratchVariables.putAll(newValues);
+            if (workflowScratchVariablesUpdatedThisStep==null) workflowScratchVariablesUpdatedThisStep = MutableMap.of();
+            workflowScratchVariablesUpdatedThisStep.putAll(newValues);
+        }
     }
 
     public Map<String, List<Instant>> getRetryRecords() {
@@ -660,18 +718,61 @@
 
     @JsonIgnore
     public Object getPreviousStepOutput() {
-        if (lastErrorHandlerOutput!=null) return lastErrorHandlerOutput;
-        if (previousStepIndex==null) return null;
+        Pair<Object, Set<Integer>> p = getStepOutputAndBacktrackedSteps(null);
+        if (p==null) return null;
+        return p.getLeft();
+    }
+    @JsonIgnore
+    Pair<Object,Set<Integer>> getStepOutputAndBacktrackedSteps(Integer stepOrNullForPrevious) {
+        if (stepOrNullForPrevious==null && lastErrorHandlerOutput!=null) return Pair.of(lastErrorHandlerOutput,null);
 
-        OldStepRecord last = oldStepInfo.get(previousStepIndex);
-        if (last!=null && last.context!=null) return last.context.output;
+        Integer prevSI = stepOrNullForPrevious==null ? previousStepIndex : stepOrNullForPrevious;
+        Set<Integer> previousSteps = MutableSet.of();
+        while (prevSI!=null && previousSteps.add(prevSI)) {
+            OldStepRecord last = oldStepInfo.get(prevSI);
+            if (last==null || last.context==null) break;
+            if (last.context.getOutput() !=null) return Pair.of(last.context.getOutput(), previousSteps);
+            if (last.previous.isEmpty()) break;
+            prevSI = last.previous.iterator().next();
+        }
         return null;
     }
 
+    @JsonIgnore
+    public Pair<Map<String,Object>,Set<Integer>> getStepWorkflowScratchAndBacktrackedSteps(Integer stepOrNullForPrevious) {
+        Integer prevSI = stepOrNullForPrevious==null ? previousStepIndex : stepOrNullForPrevious;
+        Set<Integer> previousSteps = MutableSet.of();
+        Map<String,Object> result = MutableMap.of();
+        boolean includeUpdates = stepOrNullForPrevious==null;  // exclude first update if getting at an explicit step
+        while (prevSI!=null && previousSteps.add(prevSI)) {
+            OldStepRecord last = oldStepInfo.get(prevSI);
+            if (last==null) break;
+            if (includeUpdates && last.workflowScratchUpdates !=null) {
+                result = MutableMap.copyOf(last.workflowScratchUpdates).add(result);
+            }
+            includeUpdates = true;
+            if (last.workflowScratch !=null) {
+                result = MutableMap.copyOf(last.workflowScratch).add(result);
+                result.entrySet().stream().filter(e -> e.getValue()==null).map(Map.Entry::getKey).forEach(result::remove);
+                break;
+            }
+            if (last.previous==null || last.previous.isEmpty()) break;
+            prevSI = last.previous.iterator().next();
+        }
+        return Pair.of(result, previousSteps);
+    }
+
     public Object getOutput() {
         return output;
     }
 
+    public static void checkEqual(Object o1, Object o2) {
+        if (!Objects.equals(o1, o2)) {
+            log.warn("Objects different: " + o1 + " / " + o2);
+            throw new IllegalStateException("Objects different: " + o1 + " / " + o2);
+        }
+    }
+
     public String getName() {
         return name;
     }
@@ -961,7 +1062,7 @@
                                 if (currentStepInstance==null || currentStepInstance.getStepIndex()!=currentStepIndex) {
                                     throw new IllegalStateException("Running workflow at unexpected step, "+currentStepIndex+" v "+currentStepInstance);
                                 }
-                                currentStepInstance.output = null;
+                                currentStepInstance.setOutput(null);
                                 currentStepInstance.injectContext(WorkflowExecutionContext.this);
 
                                 log.debug("Replaying workflow '" + name + "', reusing instance " + currentStepInstance + " for step " + workflowStepReference(currentStepIndex) + ")");
@@ -1084,7 +1185,9 @@
                         errorHandled = true;
 
                         currentStepInstance.next = WorkflowReplayUtils.getNext(result.next, STEP_TARGET_NAME_FOR_END);
-                        if (result.output != null) output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, result.output, Object.class);
+                        if (result.output != null) {
+                            output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, result.output, Object.class);
+                        }
 
                         moveToNextStep("Handled error in workflow around step " + workflowStepReference(currentStepIndex), result.next==null);
 
@@ -1141,13 +1244,13 @@
             // record how it ended
             oldStepInfo.compute(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex, (index, old) -> {
                 if (old == null) old = new OldStepRecord();
-                old.next = MutableSet.<Integer>of(STEP_INDEX_FOR_END).putAll(old.next);
+                old.next = MutableSet.of(STEP_INDEX_FOR_END).putAll(old.next);
                 old.nextTaskId = null;
                 return old;
             });
             oldStepInfo.compute(STEP_INDEX_FOR_END, (index, old) -> {
                 if (old == null) old = new OldStepRecord();
-                old.previous = MutableSet.<Integer>of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous);
+                old.previous = MutableSet.of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous);
                 old.previousTaskId = previousStepTaskId;
                 return old;
             });
@@ -1160,7 +1263,7 @@
             }
             OldStepRecord last = oldStepInfo.get(step);
             if (last != null) {
-                workflowScratchVariables = last.workflowScratch;
+                workflowScratchVariables = getStepWorkflowScratchAndBacktrackedSteps(step).getLeft();
                 previousStepIndex = last.previous==null ? null : last.previous.stream().findFirst().orElse(null);
 
             } else {
@@ -1207,7 +1310,7 @@
                 resetWorkflowContextPreviousAndScratchVarsToStep(currentStepIndex, false);
             }
             if (continuationInstructions.customWorkflowScratchVariables!=null) {
-                workflowScratchVariables.putAll(continuationInstructions.customWorkflowScratchVariables);
+                updateWorkflowScratchVariables(continuationInstructions.customWorkflowScratchVariables);
             }
 
         }
@@ -1253,7 +1356,7 @@
         }
 
         private Object endWithSuccess() {
-            WorkflowReplayUtils.updateOnWorkflowSuccess(WorkflowExecutionContext.this, task, output);
+            WorkflowReplayUtils.updateOnWorkflowSuccess(WorkflowExecutionContext.this, task, getOutput());
             persist();
             return output;
         }
@@ -1316,13 +1419,15 @@
                 t = step.newTask(currentStepInstance);
             }
 
+            updateOldNextStepOnThisStepStarting();
+
             // about to run -- checkpoint noting current and previous steps, and updating replayable from info
             OldStepRecord currentStepRecord = oldStepInfo.compute(currentStepIndex, (index, old) -> {
                 if (old == null) old = new OldStepRecord();
                 old.countStarted++;
-                if (!workflowScratchVariables.isEmpty())
-                    old.workflowScratch = MutableMap.copyOf(workflowScratchVariables);
-                else old.workflowScratch = null;
+
+                old.workflowScratchUpdates = null;
+
                 old.previous = MutableSet.<Integer>of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous);
                 old.previousTaskId = previousStepTaskId;
                 old.nextTaskId = null;
@@ -1331,6 +1436,10 @@
             WorkflowReplayUtils.updateReplayableFromStep(WorkflowExecutionContext.this, step);
             oldStepInfo.compute(previousStepIndex==null ? STEP_INDEX_FOR_START : previousStepIndex, (index, old) -> {
                 if (old==null) old = new OldStepRecord();
+                if (previousStepIndex==null && workflowScratchVariables!=null && !workflowScratchVariables.isEmpty()) {
+                    // if workflow scratch vars were initialized prior to run, we nee to save those
+                    old.workflowScratch = MutableMap.copyOf(workflowScratchVariables);
+                }
                 old.next = MutableSet.<Integer>of(currentStepIndex).putAll(old.next);
                 old.nextTaskId = t.getId();
                 return old;
@@ -1343,25 +1452,49 @@
 
             persist();
 
-            BiConsumer<Object,Object> onFinish = (output,overrideNext) -> {
+            BiConsumer<Object,Object> onFinish = (stepOutputDefinition,overrideNext) -> {
                 currentStepInstance.next = WorkflowReplayUtils.getNext(overrideNext, currentStepInstance, step);
-                if (output!=null) currentStepInstance.output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, output, Object.class);
+                if (stepOutputDefinition!=null) {
+                    Object outputResolved = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, stepOutputDefinition, Object.class);
+                    currentStepInstance.setOutput(outputResolved);
+                }
+                if (currentStepInstance.output != null) {
+                    Pair<Object, Set<Integer>> prev = getStepOutputAndBacktrackedSteps(null);
+                    if (prev != null && Objects.equals(prev.getLeft(), currentStepInstance.output) && lastErrorHandlerOutput==null) {
+                        // optimization, clear the value here if we can simply take it from the previous step
+                        currentStepInstance.output = null;
+                    }
+                }
+                if (workflowScratchVariablesUpdatedThisStep!=null && !workflowScratchVariablesUpdatedThisStep.isEmpty()) {
+                    currentStepRecord.workflowScratchUpdates = workflowScratchVariablesUpdatedThisStep;
+                }
+                if (currentStepRecord.workflowScratch != null) {
+                    // if we are repeating, check if we need to keep what we were repeating
+                    Pair<Map<String, Object>, Set<Integer>> prev = getStepWorkflowScratchAndBacktrackedSteps(null);
+                    if (prev!=null && !prev.getRight().contains(currentStepIndex) && Objects.equals(prev.getLeft(), currentStepRecord.workflowScratch)){
+                        currentStepRecord.workflowScratch = null;
+                    }
+                }
+
+                workflowScratchVariablesUpdatedThisStep = null;
             };
 
             // now run the step
             try {
                 Duration duration = step.getTimeout();
+                Object newOutput;
                 if (duration!=null) {
                     boolean isEnded = DynamicTasks.queue(t).blockUntilEnded(duration);
                     if (isEnded) {
-                        currentStepInstance.output = t.getUnchecked();
+                        newOutput = t.getUnchecked();
                     } else {
                         t.cancel(true);
                         throw new TimeoutException("Timeout after "+duration+": "+t.getDisplayName());
                     }
                 } else {
-                    currentStepInstance.output = DynamicTasks.queue(t).getUnchecked();
+                    newOutput = DynamicTasks.queue(t).getUnchecked();
                 }
+                currentStepInstance.setOutput(newOutput);
 
                 // allow output to be customized / overridden
                 onFinish.accept(step.output, null);
@@ -1453,6 +1586,28 @@
         }
     }
 
+    private void updateStepOutput(WorkflowStepInstanceExecutionContext step, Object newOutput) {
+        step.output = step.outputOld = newOutput;
+    }
+    private void updateOldNextStepOnThisStepStarting() {
+        // at step start, we update the _next_ record to have a copy of our old output and workflow vars
+        OldStepRecord old = oldStepInfo.get(currentStepInstance.stepIndex);
+        if (old!=null && old.next!=null && !old.next.isEmpty()) {
+            Integer lastNext = old.next.iterator().next();
+            OldStepRecord oldNext = oldStepInfo.get(lastNext);
+            if (oldNext!=null && oldNext.context!=null) {
+                // if oldNext has no context then we never ran it, so we aren't repeating, we're replaying
+                if (oldNext.context.output ==null) {
+                    // below will access the _previous_ StepInstanceExecutionContext, as oldStepRecord.context is update at end of step
+                    // thus below gets the _previous_ output known at this step, saving it in the next
+                    oldNext.context.output = old.context.output;
+                }
+
+                oldNext.workflowScratch = getWorkflowScratchVariables();
+            }
+        }
+    }
+
     public Maybe<Pair<Integer,Boolean>> getIndexOfStepId(String next) {
         if (next==null) return Maybe.absent("Null step ID supplied");
         Function<WorkflowExecutionContext, Integer> predefined = PREDEFINED_NEXT_TARGETS.get(next.toLowerCase());
@@ -1494,4 +1649,24 @@
         return "neg-"+(index); // unknown
     }
 
+    public static class Converter implements com.fasterxml.jackson.databind.util.Converter<WorkflowExecutionContext,WorkflowExecutionContext> {
+        @Override
+        public WorkflowExecutionContext convert(WorkflowExecutionContext value) {
+            if (value.workflowScratchVariables ==null || value.workflowScratchVariables.isEmpty()) {
+                value.workflowScratchVariables = value.getStepWorkflowScratchAndBacktrackedSteps(null).getLeft();
+            }
+            // note: no special handling needed for output; it is derived from the last non-null step output
+            return value;
+        }
+
+        @Override
+        public JavaType getInputType(TypeFactory typeFactory) {
+            return typeFactory.constructType(WorkflowExecutionContext.class);
+        }
+
+        @Override
+        public JavaType getOutputType(TypeFactory typeFactory) {
+            return typeFactory.constructType(WorkflowExecutionContext.class);
+        }
+    }
 }
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
index d9fad13..b3e6c05 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
@@ -41,6 +41,7 @@
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.javalang.Boxing;
 import org.apache.brooklyn.util.time.Time;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,24 +113,26 @@
             }
 
             if ("output".equals(key)) {
-                if (context.output!=null) return TemplateProcessor.wrapAsTemplateModel(context.output);
-                if (context.currentStepInstance!=null && context.currentStepInstance.output!=null) return TemplateProcessor.wrapAsTemplateModel(context.currentStepInstance.output);
-                if (context.getPreviousStepOutput()!=null) return TemplateProcessor.wrapAsTemplateModel(context.getPreviousStepOutput());
+                if (context.getOutput()!=null) return TemplateProcessor.wrapAsTemplateModel(context.getOutput());
+                if (context.currentStepInstance!=null && context.currentStepInstance.getOutput() !=null) return TemplateProcessor.wrapAsTemplateModel(context.currentStepInstance.getOutput());
+                Object previousStepOutput = context.getPreviousStepOutput();
+                if (previousStepOutput!=null) return TemplateProcessor.wrapAsTemplateModel(previousStepOutput);
                 return ifNoMatches();
             }
 
             Object candidate;
 
-            //workflow.current_step.input.somevar
             if (stage.after(WorkflowExpressionStage.STEP_PRE_INPUT)) {
+                //somevar -> workflow.current_step.output.somevar
                 WorkflowStepInstanceExecutionContext currentStep = context.currentStepInstance;
                 if (currentStep != null && stage.after(WorkflowExpressionStage.STEP_OUTPUT)) {
-                    if (currentStep.output instanceof Map) {
-                        candidate = ((Map) currentStep.output).get(key);
+                    if (currentStep.getOutput() instanceof Map) {
+                        candidate = ((Map) currentStep.getOutput()).get(key);
                         if (candidate != null) return TemplateProcessor.wrapAsTemplateModel(candidate);
                     }
                 }
 
+                //somevar -> workflow.current_step.input.somevar
                 try {
                     candidate = currentStep.getInput(key, Object.class);
                 } catch (Throwable t) {
@@ -186,7 +189,7 @@
 
             //workflow.scratch.somevar
             if (stage.after(WorkflowExpressionStage.WORKFLOW_INPUT)) {
-                candidate = context.workflowScratchVariables.get(key);
+                candidate = context.getWorkflowScratchVariables().get(key);
                 if (candidate != null) return TemplateProcessor.wrapAsTemplateModel(candidate);
             }
 
@@ -230,7 +233,7 @@
             if ("error".equals(key)) return TemplateProcessor.wrapAsTemplateModel(errorHandlerContext!=null ? errorHandlerContext.error : null);
 
             if ("input".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.input);
-            if ("output".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.output);
+            if ("output".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.getOutput());
 
             //current_step.yyy and previous_step.yyy (where yyy is any of the above)
             //step.xxx.yyy ? - where yyy is any of the above and xxx any step id
@@ -240,7 +243,7 @@
             if ("step".equals(key)) return new WorkflowStepModel();
             if ("util".equals(key)) return new WorkflowUtilModel();
 
-            if ("var".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.workflowScratchVariables);
+            if ("var".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.getWorkflowScratchVariables());
 
             return ifNoMatches();
         }
@@ -290,7 +293,9 @@
 
             if ("input".equals(key)) return TemplateProcessor.wrapAsTemplateModel(step.input);
             if ("output".equals(key)) {
-                return TemplateProcessor.wrapAsTemplateModel(step.output != null ? step.output : MutableMap.of());
+                Pair<Object, Set<Integer>> outputOfStep = context.getStepOutputAndBacktrackedSteps(step.stepIndex);
+                Object output = (outputOfStep != null && outputOfStep.getLeft() != null) ? outputOfStep.getLeft() : MutableMap.of();
+                return TemplateProcessor.wrapAsTemplateModel(output);
             }
 
             return ifNoMatches();
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
index 1342a1e..4f80f5d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
@@ -28,7 +28,6 @@
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.guava.Maybe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,6 +89,7 @@
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     Set<BrooklynTaskTags.WorkflowTaskTag> subWorkflows = MutableSet.of();
 
+    transient Object outputOld;
     Object output;
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     public Map<String,Object> otherMetadata = MutableMap.of();
@@ -165,6 +165,7 @@
         return output;
     }
     public void setOutput(Object output) {
+        this.outputOld = output;
         this.output = output;
     }
 
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
index d2580ea..1a4d5a9 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
@@ -32,7 +32,6 @@
 import org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext;
 import org.apache.brooklyn.api.objs.BrooklynObject;
 import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
@@ -408,7 +407,7 @@
                 WorkflowExecutionContext wc = p.getLeft();
                 if (wc.getCurrentStepIndex()==null || wc.getCurrentStepIndex()==WorkflowExecutionContext.STEP_INDEX_FOR_START) {
                     // initialize to last if it hasn't started
-                    wc.getWorkflowScratchVariables().putAll(reducingV);
+                    wc.updateWorkflowScratchVariables(reducingV);
                 }
 
                 DynamicTasks.queue(p.getRight()).getUnchecked();
@@ -428,7 +427,7 @@
             returnValue = !wasList ? Iterables.getOnlyElement(result) : result;
         } else {
             context.setOutput(reducingV);
-            context.getWorkflowExectionContext().getWorkflowScratchVariables().putAll(reducingV);
+            context.getWorkflowExectionContext().updateWorkflowScratchVariables(reducingV);
             returnValue = reducingV;
         }
 
@@ -543,7 +542,7 @@
 
 
         String tivn = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, target_index_var_name, String.class);
-        if (targetIndexOrNull!=null) nestedWorkflowContext.getWorkflowScratchVariables().put(tivn==null ? TARGET_INDEX_VAR_NAME_DEFAULT : tivn, targetIndexOrNull);
+        if (targetIndexOrNull!=null) nestedWorkflowContext.updateWorkflowScratchVariable(tivn == null ? TARGET_INDEX_VAR_NAME_DEFAULT : tivn, targetIndexOrNull);
         initializeSubWorkflowForTarget(context, target, nestedWorkflowContext);
 
         return nestedWorkflowContext;
@@ -551,7 +550,7 @@
 
     protected void initializeSubWorkflowForTarget(WorkflowStepInstanceExecutionContext context, Object target, WorkflowExecutionContext nestedWorkflowContext) {
         String tvn = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, target_var_name, String.class);
-        nestedWorkflowContext.getWorkflowScratchVariables().put(tvn==null ? TARGET_VAR_NAME_DEFAULT : tvn, target);
+        nestedWorkflowContext.updateWorkflowScratchVariable(tvn==null ? TARGET_VAR_NAME_DEFAULT : tvn, target);
     }
 
     /** Returns a top-level workflow running the workflow defined here */
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
index 4d9879b..3c179bc 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
@@ -65,10 +65,10 @@
             if (tvn.startsWith("{") && tvn.endsWith("}")) {
                 String[] spreadVars = tvn.substring(1, tvn.length() - 1).split(",");
                 if (!(target instanceof Map)) throw new IllegalStateException("Spread vars indicated in foreach but target is not a map");
-                nestedWorkflowContext.getWorkflowScratchVariables().put(TARGET_VAR_NAME_DEFAULT, target);
+                nestedWorkflowContext.updateWorkflowScratchVariable(TARGET_VAR_NAME_DEFAULT, target);
                 for (String spreadVar: spreadVars) {
                     String svt = spreadVar.trim();
-                    nestedWorkflowContext.getWorkflowScratchVariables().put(svt, ((Map)target).get(svt));
+                    nestedWorkflowContext.updateWorkflowScratchVariable(svt, ((Map)target).get(svt));
                 }
                 return;
             }
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java
index 4ae6d21..3623c47 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java
@@ -43,7 +43,7 @@
         if (variable ==null) throw new IllegalArgumentException("Variable name is required");
         String name = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, variable.name, String.class);
         if (Strings.isBlank(name)) throw new IllegalArgumentException("Variable name is required");
-        context.getWorkflowExectionContext().getWorkflowScratchVariables().remove(name);
+        context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, null);
         return context.getPreviousStepOutput();
     }
 
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java
index 197aebe..da68001 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java
@@ -89,7 +89,7 @@
 
         Object resolvedValue = new SetVariableWorkflowStep.ConfigurableInterpolationEvaluation(context, type, data, context.getInputOrDefault(INTERPOLATION_MODE), context.getInputOrDefault(INTERPOLATION_ERRORS)).evaluate();
 
-        context.getWorkflowExectionContext().getWorkflowScratchVariables().put(name, resolvedValue);
+        context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, resolvedValue);
 
         context.noteOtherMetadata("Loaded", ByteSizeStrings.java().makeSizeString(data.getBytes().length)+" from "+url+" into "+variable);
         return context.getPreviousStepOutput();
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java
index 6531ba0..7da45ca 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java
@@ -143,9 +143,9 @@
                 throw new IllegalArgumentException("Invalid list index " + listIndex);
             }
             oldValue = l.set(listIndex, resolvedValue);
-            context.getWorkflowExectionContext().getWorkflowScratchVariables().put(listName, l);
+            context.getWorkflowExectionContext().updateWorkflowScratchVariable(listName, l);
         } else {
-            oldValue = context.getWorkflowExectionContext().getWorkflowScratchVariables().put(name, resolvedValue);
+            oldValue = context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, resolvedValue);
         }
         return oldValue;
     }
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java
index db5be0b..3b597da 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java
@@ -82,7 +82,7 @@
         log.debug("Wait resolved after "+duration+", "+input.get(unresolvedValue)+" is: "+resolvedValue);
 
         if (name!=null) {
-            Object oldValue = context.getWorkflowExectionContext().getWorkflowScratchVariables().put(name, resolvedValue);
+            Object oldValue = context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, resolvedValue);
             if (oldValue!=null) context.noteOtherMetadata("Previous value", oldValue);
             context.noteOtherMetadata("Value set", resolvedValue);
             return context.getPreviousStepOutput();
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
index 128375e..16e88d8 100644
--- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
@@ -21,7 +21,6 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntityInitializer;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.mgmt.ManagementContext;
@@ -344,12 +343,12 @@
 
     @Test
     public void testWorkflowResolutionScratchVariable() {
-        doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().getWorkflowScratchVariables().put("foo", "bar"), "${foo}", "bar");
+        doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().updateWorkflowScratchVariable("foo", "bar"), "${foo}", "bar");
     }
 
     @Test
     public void testWorkflowResolutionScratchVariableCoerced() {
-        doTestOfTypedWorkflowVariable(context -> context.getWorkflowExectionContext().getWorkflowScratchVariables().put("foo", "7"), "${foo}", "integer", 7);
+        doTestOfTypedWorkflowVariable(context -> context.getWorkflowExectionContext().updateWorkflowScratchVariable("foo", "7"), "${foo}", "integer", 7);
     }
 
     @Test
@@ -359,7 +358,7 @@
 
     @Test
     public void testWorkflowResolutionMore() {
-        doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().getWorkflowScratchVariables().put("foo", MutableList.of("baz", "bar")), "${foo[1]}", "bar");
+        doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().updateWorkflowScratchVariable("foo", MutableList.of("baz", "bar")), "${foo[1]}", "bar");
         doTestOfWorkflowVariable(context -> context.getEntity().config().set(ConfigKeys.newConfigKey(Object.class, "foo"), MutableMap.of("bar", "baz")), "${entity.config.foo.bar}", "baz");
     }
 
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
index b788413..d95b053 100644
--- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
@@ -873,7 +873,7 @@
         Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId(), w4.getWorkflowId()));  // should replace the one above
     }
 
-    @Test(groups="Integration")
+    @Test(groups="Integration")  // very slow
     public void testRetentionManyWaysIncludingDisabled() throws Exception {
         app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
 
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSizeTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSizeTest.java
new file mode 100644
index 0000000..ec81b25
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSizeTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.workflow;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.resolve.jackson.BeanWithTypeUtils;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynMgmtUnitTestSupport;
+import org.apache.brooklyn.entity.stock.BasicApplication;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.FileOutputStream;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.Map;
+
+public class WorkflowSizeTest extends BrooklynMgmtUnitTestSupport {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkflowSizeTest.class);
+
+    private BasicApplication app;
+
+    protected void createAppWithEffector(List<?> steps) {
+        WorkflowBasicTest.addWorkflowStepTypes(mgmt);
+
+        if (this.app!=null) throw new IllegalStateException("Already have an app");
+        this.app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
+        WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance()
+                .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflow")
+                .configure(WorkflowEffector.EFFECTOR_PARAMETER_DEFS, MutableMap.of("param", null))
+                .configure(WorkflowEffector.STEPS, (List) steps)
+        );
+        eff.apply((EntityLocal)app);
+    }
+
+    @Test
+    public void testSizeOfAllSensors() throws JsonProcessingException {
+        createAppWithEffector(MutableList.of(
+                "let pc = ${param}",
+                "let map myMap = {}",
+                "transform param | prepend hello-",
+                "let myMap.a = ${param}",
+                "let myMap.b = ${output}",
+                "return ${myMap}"
+        ));
+
+        String sampleData = "sample data for testing something big\n";
+
+        app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), MutableMap.of("param", sampleData)).getUnchecked();
+
+        Map<String, Integer> sizes = getSensorSizes();
+        sizes.forEach((k,v) -> { log.info("Sensor "+k+": "+v); });
+
+        Asserts.assertThat(sizes.values().stream().reduce(0, (v0,v1)->v0+v1), result -> result < 10*1000);
+
+        // print out the above, search for "something big" to see where the size is used
+        String out = BeanWithTypeUtils.newYamlMapper(mgmt, true, null, true).writeValueAsString(
+                app.sensors().get(Sensors.newSensor(Object.class, "internals.brooklyn.workflow")));
+        log.info("WORKFLOW IS:\n"+out);
+
+        app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), MutableMap.of("param", sampleData)).getUnchecked();
+        sizes = getSensorSizes();
+        sizes.forEach((k,v) -> { log.info("Sensor "+k+": "+v); });
+        Asserts.assertThat(sizes.values().stream().reduce(0, (v0,v1)->v0+v1), result -> result < 20*1000);
+
+        // 100k payload now -> bumps sensor (json) size from 5k to 3MB (before any optimization)
+        // [xml persistence is less of an issue because it will use a shared reference]
+        // removing output which is identical to the previous gives minor savings (in this test): 3380416 -> 3176074
+        // removing scratch at workflow which matches a step reduces further: -> 2869522
+        // switching to model where scratch is produced incrementally reduces much more -> 1847848
+        for (int i=0; i<1000; i++) {
+            for (int j=0; j<10; j++) sampleData += "0123456789";
+            sampleData += "\n";
+        }
+        app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), MutableMap.of("param", sampleData)).getUnchecked();
+        sizes = getSensorSizes();
+        sizes.forEach((k,v) -> { log.info("Sensor "+k+": "+v); });
+        Asserts.assertThat(sizes.values().stream().reduce(0, (v0,v1)->v0+v1), result -> result > 100*1000);
+    }
+
+    protected Map<String,Integer> getSensorSizes() {
+        //Dumper.dumpInfo(app);
+        Map<String,Integer> sizes = MutableMap.of();
+        for (int retryWhileCME=0; ; retryWhileCME++) {
+            try {
+                sizes.clear();
+                app.sensors().getAll().forEach((k, v) -> {
+                    try {
+                        sizes.put(k.getName(), BeanWithTypeUtils.newMapper(mgmt, false, null, false).writeValueAsString(v).length());
+                    } catch (JsonProcessingException e) {
+                        throw Exceptions.propagate(e);
+                    }
+                });
+                break;
+            } catch (Exception e) {
+                boolean allowedToRetry = false;
+                allowedToRetry |= Exceptions.getFirstThrowableOfType(e, ConcurrentModificationException.class)!=null;
+                allowedToRetry |= Exceptions.getFirstThrowableOfType(e, NullPointerException.class)!=null;
+                if (allowedToRetry && retryWhileCME<10) {
+                    log.info("Serializing sensors failed; will retry: "+e);
+                    Time.sleep(100);
+                    continue;
+                }
+                throw Exceptions.propagate(e);
+            }
+        }
+        return sizes;
+    }
+}