Merge branch 'workflow-size-optimizations'
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 14e69e0..26fc66d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -18,10 +18,10 @@
*/
package org.apache.brooklyn.core.workflow;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.*;
+import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.collect.Iterables;
import com.google.common.reflect.TypeToken;
import org.apache.brooklyn.api.entity.Entity;
@@ -81,6 +81,7 @@
import static org.apache.brooklyn.core.workflow.WorkflowReplayUtils.ReplayResumeDepthCheck.RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT;
@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonDeserialize(converter = WorkflowExecutionContext.Converter.class)
public class WorkflowExecutionContext {
private static final Logger log = LoggerFactory.getLogger(WorkflowExecutionContext.class);
@@ -145,8 +146,33 @@
Map<String,Object> inputResolved = MutableMap.of();
Object outputDefinition;
+ /** final output of the workflow, set at end */
Object output;
+ /*
+ * Tricks to keep size of persisted/serialized data down:
+ *
+ * * this.output set at the end (as a copy)
+ * NEW: only set if different, on lookup if null, if finished, get from last step
+ * * step.output set on completion of each step (copying previous if no explicit output)
+// * - if never run
+// * - if different to previous step, set
+// * - if same as previous step, set null
+// * - if run before
+// * - if old value equals new value, do nothing
+// * - if old value different
+ * - if new value is same as previous step, set null, else set new value
+ * - & if last run's next step output was null, set it to the old value here
+ * NEW: only set if different to previous step or not recoverable
+ * - if different to previous step, set
+ * - if there was a previous instance of this step: if we are different to previous instance of this step, set
+ * - if same. has this step
+ *
+ * * this.workflowScratch - currently set on context dynamically, copied to oldStepInfo
+ * NEW 1: set on context and oldStepInfo dynamically but null if same as previous, retrieved looking up previous
+ * NEW 2: add up all the previous until it repeats
+ */
+
Object lock;
Duration timeout;
@@ -194,8 +220,10 @@
WorkflowStepInstanceExecutionContext context;
/** is step replayable */
Boolean replayableFromHere;
- /** scratch for last _started_ instance of step */
+ /** scratch vars as at start of last invocation of set _if_ they could not be derived from updates */
Map<String,Object> workflowScratch;
+ /** updates to scratch vars made by the last run of this step */
+ Map<String,Object> workflowScratchUpdates;
/** steps that immediately preceded this, updated when _this_ step started, with most recent first */
Set<Integer> previous;
/** steps that immediately followed this, updated when _next_ step started, with most recent first */
@@ -205,7 +233,14 @@
String nextTaskId;
}
- Map<String,Object> workflowScratchVariables = MutableMap.of();
+ // when persisted, this is omitted and restored from the oldStepInfo map on read
+ transient Map<String,Object> workflowScratchVariables;
+ transient Map<String,Object> workflowScratchVariablesUpdatedThisStep;
+
+ @JsonSetter("workflowScratchVariables") //only used to read in old state which stored this
+ public void setWorkflowScratchVariablesToDeserializeOld(Map<String, Object> workflowScratchVariables) {
+ this.workflowScratchVariables = workflowScratchVariables;
+ }
@JsonInclude(JsonInclude.Include.NON_EMPTY)
Map<String,List<Instant>> retryRecords = MutableMap.of();
@@ -366,8 +401,31 @@
return parentTag;
}
+ @JsonIgnore
public Map<String, Object> getWorkflowScratchVariables() {
- return workflowScratchVariables;
+ if (workflowScratchVariables == null) {
+ Pair<Map<String, Object>, Set<Integer>> prev = getStepWorkflowScratchAndBacktrackedSteps(null);
+ workflowScratchVariables = prev.getLeft();
+ }
+ return MutableMap.copyOf(workflowScratchVariables).asUnmodifiable();
+ }
+
+ public Object updateWorkflowScratchVariable(String s, Object v) {
+ if (workflowScratchVariables ==null) getWorkflowScratchVariables();
+ Object old = workflowScratchVariables.put(s, v);
+ if (v==null) workflowScratchVariables.remove(s);
+ if (workflowScratchVariablesUpdatedThisStep==null) workflowScratchVariablesUpdatedThisStep = MutableMap.of();
+ workflowScratchVariablesUpdatedThisStep.put(s, v);
+ return old;
+ }
+
+ public void updateWorkflowScratchVariables(Map<String,Object> newValues) {
+ if (newValues!=null && !newValues.isEmpty()) {
+ if (workflowScratchVariables ==null) getWorkflowScratchVariables();
+ workflowScratchVariables.putAll(newValues);
+ if (workflowScratchVariablesUpdatedThisStep==null) workflowScratchVariablesUpdatedThisStep = MutableMap.of();
+ workflowScratchVariablesUpdatedThisStep.putAll(newValues);
+ }
}
public Map<String, List<Instant>> getRetryRecords() {
@@ -660,18 +718,61 @@
@JsonIgnore
public Object getPreviousStepOutput() {
- if (lastErrorHandlerOutput!=null) return lastErrorHandlerOutput;
- if (previousStepIndex==null) return null;
+ Pair<Object, Set<Integer>> p = getStepOutputAndBacktrackedSteps(null);
+ if (p==null) return null;
+ return p.getLeft();
+ }
+ @JsonIgnore
+ Pair<Object,Set<Integer>> getStepOutputAndBacktrackedSteps(Integer stepOrNullForPrevious) {
+ if (stepOrNullForPrevious==null && lastErrorHandlerOutput!=null) return Pair.of(lastErrorHandlerOutput,null);
- OldStepRecord last = oldStepInfo.get(previousStepIndex);
- if (last!=null && last.context!=null) return last.context.output;
+ Integer prevSI = stepOrNullForPrevious==null ? previousStepIndex : stepOrNullForPrevious;
+ Set<Integer> previousSteps = MutableSet.of();
+ while (prevSI!=null && previousSteps.add(prevSI)) {
+ OldStepRecord last = oldStepInfo.get(prevSI);
+ if (last==null || last.context==null) break;
+ if (last.context.getOutput() !=null) return Pair.of(last.context.getOutput(), previousSteps);
+ if (last.previous.isEmpty()) break;
+ prevSI = last.previous.iterator().next();
+ }
return null;
}
+ @JsonIgnore
+ public Pair<Map<String,Object>,Set<Integer>> getStepWorkflowScratchAndBacktrackedSteps(Integer stepOrNullForPrevious) {
+ Integer prevSI = stepOrNullForPrevious==null ? previousStepIndex : stepOrNullForPrevious;
+ Set<Integer> previousSteps = MutableSet.of();
+ Map<String,Object> result = MutableMap.of();
+ boolean includeUpdates = stepOrNullForPrevious==null; // exclude first update if getting at an explicit step
+ while (prevSI!=null && previousSteps.add(prevSI)) {
+ OldStepRecord last = oldStepInfo.get(prevSI);
+ if (last==null) break;
+ if (includeUpdates && last.workflowScratchUpdates !=null) {
+ result = MutableMap.copyOf(last.workflowScratchUpdates).add(result);
+ }
+ includeUpdates = true;
+ if (last.workflowScratch !=null) {
+ result = MutableMap.copyOf(last.workflowScratch).add(result);
+ result.entrySet().stream().filter(e -> e.getValue()==null).map(Map.Entry::getKey).forEach(result::remove);
+ break;
+ }
+ if (last.previous==null || last.previous.isEmpty()) break;
+ prevSI = last.previous.iterator().next();
+ }
+ return Pair.of(result, previousSteps);
+ }
+
public Object getOutput() {
return output;
}
+ public static void checkEqual(Object o1, Object o2) {
+ if (!Objects.equals(o1, o2)) {
+ log.warn("Objects different: " + o1 + " / " + o2);
+ throw new IllegalStateException("Objects different: " + o1 + " / " + o2);
+ }
+ }
+
public String getName() {
return name;
}
@@ -961,7 +1062,7 @@
if (currentStepInstance==null || currentStepInstance.getStepIndex()!=currentStepIndex) {
throw new IllegalStateException("Running workflow at unexpected step, "+currentStepIndex+" v "+currentStepInstance);
}
- currentStepInstance.output = null;
+ currentStepInstance.setOutput(null);
currentStepInstance.injectContext(WorkflowExecutionContext.this);
log.debug("Replaying workflow '" + name + "', reusing instance " + currentStepInstance + " for step " + workflowStepReference(currentStepIndex) + ")");
@@ -1084,7 +1185,9 @@
errorHandled = true;
currentStepInstance.next = WorkflowReplayUtils.getNext(result.next, STEP_TARGET_NAME_FOR_END);
- if (result.output != null) output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, result.output, Object.class);
+ if (result.output != null) {
+ output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, result.output, Object.class);
+ }
moveToNextStep("Handled error in workflow around step " + workflowStepReference(currentStepIndex), result.next==null);
@@ -1141,13 +1244,13 @@
// record how it ended
oldStepInfo.compute(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex, (index, old) -> {
if (old == null) old = new OldStepRecord();
- old.next = MutableSet.<Integer>of(STEP_INDEX_FOR_END).putAll(old.next);
+ old.next = MutableSet.of(STEP_INDEX_FOR_END).putAll(old.next);
old.nextTaskId = null;
return old;
});
oldStepInfo.compute(STEP_INDEX_FOR_END, (index, old) -> {
if (old == null) old = new OldStepRecord();
- old.previous = MutableSet.<Integer>of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous);
+ old.previous = MutableSet.of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous);
old.previousTaskId = previousStepTaskId;
return old;
});
@@ -1160,7 +1263,7 @@
}
OldStepRecord last = oldStepInfo.get(step);
if (last != null) {
- workflowScratchVariables = last.workflowScratch;
+ workflowScratchVariables = getStepWorkflowScratchAndBacktrackedSteps(step).getLeft();
previousStepIndex = last.previous==null ? null : last.previous.stream().findFirst().orElse(null);
} else {
@@ -1207,7 +1310,7 @@
resetWorkflowContextPreviousAndScratchVarsToStep(currentStepIndex, false);
}
if (continuationInstructions.customWorkflowScratchVariables!=null) {
- workflowScratchVariables.putAll(continuationInstructions.customWorkflowScratchVariables);
+ updateWorkflowScratchVariables(continuationInstructions.customWorkflowScratchVariables);
}
}
@@ -1253,7 +1356,7 @@
}
private Object endWithSuccess() {
- WorkflowReplayUtils.updateOnWorkflowSuccess(WorkflowExecutionContext.this, task, output);
+ WorkflowReplayUtils.updateOnWorkflowSuccess(WorkflowExecutionContext.this, task, getOutput());
persist();
return output;
}
@@ -1316,13 +1419,15 @@
t = step.newTask(currentStepInstance);
}
+ updateOldNextStepOnThisStepStarting();
+
// about to run -- checkpoint noting current and previous steps, and updating replayable from info
OldStepRecord currentStepRecord = oldStepInfo.compute(currentStepIndex, (index, old) -> {
if (old == null) old = new OldStepRecord();
old.countStarted++;
- if (!workflowScratchVariables.isEmpty())
- old.workflowScratch = MutableMap.copyOf(workflowScratchVariables);
- else old.workflowScratch = null;
+
+ old.workflowScratchUpdates = null;
+
old.previous = MutableSet.<Integer>of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous);
old.previousTaskId = previousStepTaskId;
old.nextTaskId = null;
@@ -1331,6 +1436,10 @@
WorkflowReplayUtils.updateReplayableFromStep(WorkflowExecutionContext.this, step);
oldStepInfo.compute(previousStepIndex==null ? STEP_INDEX_FOR_START : previousStepIndex, (index, old) -> {
if (old==null) old = new OldStepRecord();
+ if (previousStepIndex==null && workflowScratchVariables!=null && !workflowScratchVariables.isEmpty()) {
+ // if workflow scratch vars were initialized prior to run, we nee to save those
+ old.workflowScratch = MutableMap.copyOf(workflowScratchVariables);
+ }
old.next = MutableSet.<Integer>of(currentStepIndex).putAll(old.next);
old.nextTaskId = t.getId();
return old;
@@ -1343,25 +1452,49 @@
persist();
- BiConsumer<Object,Object> onFinish = (output,overrideNext) -> {
+ BiConsumer<Object,Object> onFinish = (stepOutputDefinition,overrideNext) -> {
currentStepInstance.next = WorkflowReplayUtils.getNext(overrideNext, currentStepInstance, step);
- if (output!=null) currentStepInstance.output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, output, Object.class);
+ if (stepOutputDefinition!=null) {
+ Object outputResolved = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, stepOutputDefinition, Object.class);
+ currentStepInstance.setOutput(outputResolved);
+ }
+ if (currentStepInstance.output != null) {
+ Pair<Object, Set<Integer>> prev = getStepOutputAndBacktrackedSteps(null);
+ if (prev != null && Objects.equals(prev.getLeft(), currentStepInstance.output) && lastErrorHandlerOutput==null) {
+ // optimization, clear the value here if we can simply take it from the previous step
+ currentStepInstance.output = null;
+ }
+ }
+ if (workflowScratchVariablesUpdatedThisStep!=null && !workflowScratchVariablesUpdatedThisStep.isEmpty()) {
+ currentStepRecord.workflowScratchUpdates = workflowScratchVariablesUpdatedThisStep;
+ }
+ if (currentStepRecord.workflowScratch != null) {
+ // if we are repeating, check if we need to keep what we were repeating
+ Pair<Map<String, Object>, Set<Integer>> prev = getStepWorkflowScratchAndBacktrackedSteps(null);
+ if (prev!=null && !prev.getRight().contains(currentStepIndex) && Objects.equals(prev.getLeft(), currentStepRecord.workflowScratch)){
+ currentStepRecord.workflowScratch = null;
+ }
+ }
+
+ workflowScratchVariablesUpdatedThisStep = null;
};
// now run the step
try {
Duration duration = step.getTimeout();
+ Object newOutput;
if (duration!=null) {
boolean isEnded = DynamicTasks.queue(t).blockUntilEnded(duration);
if (isEnded) {
- currentStepInstance.output = t.getUnchecked();
+ newOutput = t.getUnchecked();
} else {
t.cancel(true);
throw new TimeoutException("Timeout after "+duration+": "+t.getDisplayName());
}
} else {
- currentStepInstance.output = DynamicTasks.queue(t).getUnchecked();
+ newOutput = DynamicTasks.queue(t).getUnchecked();
}
+ currentStepInstance.setOutput(newOutput);
// allow output to be customized / overridden
onFinish.accept(step.output, null);
@@ -1453,6 +1586,28 @@
}
}
+ private void updateStepOutput(WorkflowStepInstanceExecutionContext step, Object newOutput) {
+ step.output = step.outputOld = newOutput;
+ }
+ private void updateOldNextStepOnThisStepStarting() {
+ // at step start, we update the _next_ record to have a copy of our old output and workflow vars
+ OldStepRecord old = oldStepInfo.get(currentStepInstance.stepIndex);
+ if (old!=null && old.next!=null && !old.next.isEmpty()) {
+ Integer lastNext = old.next.iterator().next();
+ OldStepRecord oldNext = oldStepInfo.get(lastNext);
+ if (oldNext!=null && oldNext.context!=null) {
+ // if oldNext has no context then we never ran it, so we aren't repeating, we're replaying
+ if (oldNext.context.output ==null) {
+ // below will access the _previous_ StepInstanceExecutionContext, as oldStepRecord.context is update at end of step
+ // thus below gets the _previous_ output known at this step, saving it in the next
+ oldNext.context.output = old.context.output;
+ }
+
+ oldNext.workflowScratch = getWorkflowScratchVariables();
+ }
+ }
+ }
+
public Maybe<Pair<Integer,Boolean>> getIndexOfStepId(String next) {
if (next==null) return Maybe.absent("Null step ID supplied");
Function<WorkflowExecutionContext, Integer> predefined = PREDEFINED_NEXT_TARGETS.get(next.toLowerCase());
@@ -1494,4 +1649,24 @@
return "neg-"+(index); // unknown
}
+ public static class Converter implements com.fasterxml.jackson.databind.util.Converter<WorkflowExecutionContext,WorkflowExecutionContext> {
+ @Override
+ public WorkflowExecutionContext convert(WorkflowExecutionContext value) {
+ if (value.workflowScratchVariables ==null || value.workflowScratchVariables.isEmpty()) {
+ value.workflowScratchVariables = value.getStepWorkflowScratchAndBacktrackedSteps(null).getLeft();
+ }
+ // note: no special handling needed for output; it is derived from the last non-null step output
+ return value;
+ }
+
+ @Override
+ public JavaType getInputType(TypeFactory typeFactory) {
+ return typeFactory.constructType(WorkflowExecutionContext.class);
+ }
+
+ @Override
+ public JavaType getOutputType(TypeFactory typeFactory) {
+ return typeFactory.constructType(WorkflowExecutionContext.class);
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
index d9fad13..b3e6c05 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
@@ -41,6 +41,7 @@
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.javalang.Boxing;
import org.apache.brooklyn.util.time.Time;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,24 +113,26 @@
}
if ("output".equals(key)) {
- if (context.output!=null) return TemplateProcessor.wrapAsTemplateModel(context.output);
- if (context.currentStepInstance!=null && context.currentStepInstance.output!=null) return TemplateProcessor.wrapAsTemplateModel(context.currentStepInstance.output);
- if (context.getPreviousStepOutput()!=null) return TemplateProcessor.wrapAsTemplateModel(context.getPreviousStepOutput());
+ if (context.getOutput()!=null) return TemplateProcessor.wrapAsTemplateModel(context.getOutput());
+ if (context.currentStepInstance!=null && context.currentStepInstance.getOutput() !=null) return TemplateProcessor.wrapAsTemplateModel(context.currentStepInstance.getOutput());
+ Object previousStepOutput = context.getPreviousStepOutput();
+ if (previousStepOutput!=null) return TemplateProcessor.wrapAsTemplateModel(previousStepOutput);
return ifNoMatches();
}
Object candidate;
- //workflow.current_step.input.somevar
if (stage.after(WorkflowExpressionStage.STEP_PRE_INPUT)) {
+ //somevar -> workflow.current_step.output.somevar
WorkflowStepInstanceExecutionContext currentStep = context.currentStepInstance;
if (currentStep != null && stage.after(WorkflowExpressionStage.STEP_OUTPUT)) {
- if (currentStep.output instanceof Map) {
- candidate = ((Map) currentStep.output).get(key);
+ if (currentStep.getOutput() instanceof Map) {
+ candidate = ((Map) currentStep.getOutput()).get(key);
if (candidate != null) return TemplateProcessor.wrapAsTemplateModel(candidate);
}
}
+ //somevar -> workflow.current_step.input.somevar
try {
candidate = currentStep.getInput(key, Object.class);
} catch (Throwable t) {
@@ -186,7 +189,7 @@
//workflow.scratch.somevar
if (stage.after(WorkflowExpressionStage.WORKFLOW_INPUT)) {
- candidate = context.workflowScratchVariables.get(key);
+ candidate = context.getWorkflowScratchVariables().get(key);
if (candidate != null) return TemplateProcessor.wrapAsTemplateModel(candidate);
}
@@ -230,7 +233,7 @@
if ("error".equals(key)) return TemplateProcessor.wrapAsTemplateModel(errorHandlerContext!=null ? errorHandlerContext.error : null);
if ("input".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.input);
- if ("output".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.output);
+ if ("output".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.getOutput());
//current_step.yyy and previous_step.yyy (where yyy is any of the above)
//step.xxx.yyy ? - where yyy is any of the above and xxx any step id
@@ -240,7 +243,7 @@
if ("step".equals(key)) return new WorkflowStepModel();
if ("util".equals(key)) return new WorkflowUtilModel();
- if ("var".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.workflowScratchVariables);
+ if ("var".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.getWorkflowScratchVariables());
return ifNoMatches();
}
@@ -290,7 +293,9 @@
if ("input".equals(key)) return TemplateProcessor.wrapAsTemplateModel(step.input);
if ("output".equals(key)) {
- return TemplateProcessor.wrapAsTemplateModel(step.output != null ? step.output : MutableMap.of());
+ Pair<Object, Set<Integer>> outputOfStep = context.getStepOutputAndBacktrackedSteps(step.stepIndex);
+ Object output = (outputOfStep != null && outputOfStep.getLeft() != null) ? outputOfStep.getLeft() : MutableMap.of();
+ return TemplateProcessor.wrapAsTemplateModel(output);
}
return ifNoMatches();
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
index 1342a1e..4f80f5d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
@@ -28,7 +28,6 @@
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.guava.Maybe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,6 +89,7 @@
@JsonInclude(JsonInclude.Include.NON_EMPTY)
Set<BrooklynTaskTags.WorkflowTaskTag> subWorkflows = MutableSet.of();
+ transient Object outputOld;
Object output;
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public Map<String,Object> otherMetadata = MutableMap.of();
@@ -165,6 +165,7 @@
return output;
}
public void setOutput(Object output) {
+ this.outputOld = output;
this.output = output;
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
index d2580ea..1a4d5a9 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
@@ -32,7 +32,6 @@
import org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
@@ -408,7 +407,7 @@
WorkflowExecutionContext wc = p.getLeft();
if (wc.getCurrentStepIndex()==null || wc.getCurrentStepIndex()==WorkflowExecutionContext.STEP_INDEX_FOR_START) {
// initialize to last if it hasn't started
- wc.getWorkflowScratchVariables().putAll(reducingV);
+ wc.updateWorkflowScratchVariables(reducingV);
}
DynamicTasks.queue(p.getRight()).getUnchecked();
@@ -428,7 +427,7 @@
returnValue = !wasList ? Iterables.getOnlyElement(result) : result;
} else {
context.setOutput(reducingV);
- context.getWorkflowExectionContext().getWorkflowScratchVariables().putAll(reducingV);
+ context.getWorkflowExectionContext().updateWorkflowScratchVariables(reducingV);
returnValue = reducingV;
}
@@ -543,7 +542,7 @@
String tivn = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, target_index_var_name, String.class);
- if (targetIndexOrNull!=null) nestedWorkflowContext.getWorkflowScratchVariables().put(tivn==null ? TARGET_INDEX_VAR_NAME_DEFAULT : tivn, targetIndexOrNull);
+ if (targetIndexOrNull!=null) nestedWorkflowContext.updateWorkflowScratchVariable(tivn == null ? TARGET_INDEX_VAR_NAME_DEFAULT : tivn, targetIndexOrNull);
initializeSubWorkflowForTarget(context, target, nestedWorkflowContext);
return nestedWorkflowContext;
@@ -551,7 +550,7 @@
protected void initializeSubWorkflowForTarget(WorkflowStepInstanceExecutionContext context, Object target, WorkflowExecutionContext nestedWorkflowContext) {
String tvn = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, target_var_name, String.class);
- nestedWorkflowContext.getWorkflowScratchVariables().put(tvn==null ? TARGET_VAR_NAME_DEFAULT : tvn, target);
+ nestedWorkflowContext.updateWorkflowScratchVariable(tvn==null ? TARGET_VAR_NAME_DEFAULT : tvn, target);
}
/** Returns a top-level workflow running the workflow defined here */
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
index 4d9879b..3c179bc 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
@@ -65,10 +65,10 @@
if (tvn.startsWith("{") && tvn.endsWith("}")) {
String[] spreadVars = tvn.substring(1, tvn.length() - 1).split(",");
if (!(target instanceof Map)) throw new IllegalStateException("Spread vars indicated in foreach but target is not a map");
- nestedWorkflowContext.getWorkflowScratchVariables().put(TARGET_VAR_NAME_DEFAULT, target);
+ nestedWorkflowContext.updateWorkflowScratchVariable(TARGET_VAR_NAME_DEFAULT, target);
for (String spreadVar: spreadVars) {
String svt = spreadVar.trim();
- nestedWorkflowContext.getWorkflowScratchVariables().put(svt, ((Map)target).get(svt));
+ nestedWorkflowContext.updateWorkflowScratchVariable(svt, ((Map)target).get(svt));
}
return;
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java
index 4ae6d21..3623c47 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java
@@ -43,7 +43,7 @@
if (variable ==null) throw new IllegalArgumentException("Variable name is required");
String name = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, variable.name, String.class);
if (Strings.isBlank(name)) throw new IllegalArgumentException("Variable name is required");
- context.getWorkflowExectionContext().getWorkflowScratchVariables().remove(name);
+ context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, null);
return context.getPreviousStepOutput();
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java
index 197aebe..da68001 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java
@@ -89,7 +89,7 @@
Object resolvedValue = new SetVariableWorkflowStep.ConfigurableInterpolationEvaluation(context, type, data, context.getInputOrDefault(INTERPOLATION_MODE), context.getInputOrDefault(INTERPOLATION_ERRORS)).evaluate();
- context.getWorkflowExectionContext().getWorkflowScratchVariables().put(name, resolvedValue);
+ context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, resolvedValue);
context.noteOtherMetadata("Loaded", ByteSizeStrings.java().makeSizeString(data.getBytes().length)+" from "+url+" into "+variable);
return context.getPreviousStepOutput();
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java
index 6531ba0..7da45ca 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java
@@ -143,9 +143,9 @@
throw new IllegalArgumentException("Invalid list index " + listIndex);
}
oldValue = l.set(listIndex, resolvedValue);
- context.getWorkflowExectionContext().getWorkflowScratchVariables().put(listName, l);
+ context.getWorkflowExectionContext().updateWorkflowScratchVariable(listName, l);
} else {
- oldValue = context.getWorkflowExectionContext().getWorkflowScratchVariables().put(name, resolvedValue);
+ oldValue = context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, resolvedValue);
}
return oldValue;
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java
index db5be0b..3b597da 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java
@@ -82,7 +82,7 @@
log.debug("Wait resolved after "+duration+", "+input.get(unresolvedValue)+" is: "+resolvedValue);
if (name!=null) {
- Object oldValue = context.getWorkflowExectionContext().getWorkflowScratchVariables().put(name, resolvedValue);
+ Object oldValue = context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, resolvedValue);
if (oldValue!=null) context.noteOtherMetadata("Previous value", oldValue);
context.noteOtherMetadata("Value set", resolvedValue);
return context.getPreviousStepOutput();
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
index 128375e..16e88d8 100644
--- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
@@ -21,7 +21,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.reflect.TypeToken;
import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntityInitializer;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.mgmt.ManagementContext;
@@ -344,12 +343,12 @@
@Test
public void testWorkflowResolutionScratchVariable() {
- doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().getWorkflowScratchVariables().put("foo", "bar"), "${foo}", "bar");
+ doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().updateWorkflowScratchVariable("foo", "bar"), "${foo}", "bar");
}
@Test
public void testWorkflowResolutionScratchVariableCoerced() {
- doTestOfTypedWorkflowVariable(context -> context.getWorkflowExectionContext().getWorkflowScratchVariables().put("foo", "7"), "${foo}", "integer", 7);
+ doTestOfTypedWorkflowVariable(context -> context.getWorkflowExectionContext().updateWorkflowScratchVariable("foo", "7"), "${foo}", "integer", 7);
}
@Test
@@ -359,7 +358,7 @@
@Test
public void testWorkflowResolutionMore() {
- doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().getWorkflowScratchVariables().put("foo", MutableList.of("baz", "bar")), "${foo[1]}", "bar");
+ doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().updateWorkflowScratchVariable("foo", MutableList.of("baz", "bar")), "${foo[1]}", "bar");
doTestOfWorkflowVariable(context -> context.getEntity().config().set(ConfigKeys.newConfigKey(Object.class, "foo"), MutableMap.of("bar", "baz")), "${entity.config.foo.bar}", "baz");
}
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
index b788413..d95b053 100644
--- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
@@ -873,7 +873,7 @@
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId(), w4.getWorkflowId())); // should replace the one above
}
- @Test(groups="Integration")
+ @Test(groups="Integration") // very slow
public void testRetentionManyWaysIncludingDisabled() throws Exception {
app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSizeTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSizeTest.java
new file mode 100644
index 0000000..ec81b25
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSizeTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.workflow;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.resolve.jackson.BeanWithTypeUtils;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynMgmtUnitTestSupport;
+import org.apache.brooklyn.entity.stock.BasicApplication;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.FileOutputStream;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.Map;
+
+public class WorkflowSizeTest extends BrooklynMgmtUnitTestSupport {
+
+ private static final Logger log = LoggerFactory.getLogger(WorkflowSizeTest.class);
+
+ private BasicApplication app;
+
+ protected void createAppWithEffector(List<?> steps) {
+ WorkflowBasicTest.addWorkflowStepTypes(mgmt);
+
+ if (this.app!=null) throw new IllegalStateException("Already have an app");
+ this.app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
+ WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance()
+ .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflow")
+ .configure(WorkflowEffector.EFFECTOR_PARAMETER_DEFS, MutableMap.of("param", null))
+ .configure(WorkflowEffector.STEPS, (List) steps)
+ );
+ eff.apply((EntityLocal)app);
+ }
+
+ @Test
+ public void testSizeOfAllSensors() throws JsonProcessingException {
+ createAppWithEffector(MutableList.of(
+ "let pc = ${param}",
+ "let map myMap = {}",
+ "transform param | prepend hello-",
+ "let myMap.a = ${param}",
+ "let myMap.b = ${output}",
+ "return ${myMap}"
+ ));
+
+ String sampleData = "sample data for testing something big\n";
+
+ app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), MutableMap.of("param", sampleData)).getUnchecked();
+
+ Map<String, Integer> sizes = getSensorSizes();
+ sizes.forEach((k,v) -> { log.info("Sensor "+k+": "+v); });
+
+ Asserts.assertThat(sizes.values().stream().reduce(0, (v0,v1)->v0+v1), result -> result < 10*1000);
+
+ // print out the above, search for "something big" to see where the size is used
+ String out = BeanWithTypeUtils.newYamlMapper(mgmt, true, null, true).writeValueAsString(
+ app.sensors().get(Sensors.newSensor(Object.class, "internals.brooklyn.workflow")));
+ log.info("WORKFLOW IS:\n"+out);
+
+ app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), MutableMap.of("param", sampleData)).getUnchecked();
+ sizes = getSensorSizes();
+ sizes.forEach((k,v) -> { log.info("Sensor "+k+": "+v); });
+ Asserts.assertThat(sizes.values().stream().reduce(0, (v0,v1)->v0+v1), result -> result < 20*1000);
+
+ // 100k payload now -> bumps sensor (json) size from 5k to 3MB (before any optimization)
+ // [xml persistence is less of an issue because it will use a shared reference]
+ // removing output which is identical to the previous gives minor savings (in this test): 3380416 -> 3176074
+ // removing scratch at workflow which matches a step reduces further: -> 2869522
+ // switching to model where scratch is produced incrementally reduces much more -> 1847848
+ for (int i=0; i<1000; i++) {
+ for (int j=0; j<10; j++) sampleData += "0123456789";
+ sampleData += "\n";
+ }
+ app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), MutableMap.of("param", sampleData)).getUnchecked();
+ sizes = getSensorSizes();
+ sizes.forEach((k,v) -> { log.info("Sensor "+k+": "+v); });
+ Asserts.assertThat(sizes.values().stream().reduce(0, (v0,v1)->v0+v1), result -> result > 100*1000);
+ }
+
+ protected Map<String,Integer> getSensorSizes() {
+ //Dumper.dumpInfo(app);
+ Map<String,Integer> sizes = MutableMap.of();
+ for (int retryWhileCME=0; ; retryWhileCME++) {
+ try {
+ sizes.clear();
+ app.sensors().getAll().forEach((k, v) -> {
+ try {
+ sizes.put(k.getName(), BeanWithTypeUtils.newMapper(mgmt, false, null, false).writeValueAsString(v).length());
+ } catch (JsonProcessingException e) {
+ throw Exceptions.propagate(e);
+ }
+ });
+ break;
+ } catch (Exception e) {
+ boolean allowedToRetry = false;
+ allowedToRetry |= Exceptions.getFirstThrowableOfType(e, ConcurrentModificationException.class)!=null;
+ allowedToRetry |= Exceptions.getFirstThrowableOfType(e, NullPointerException.class)!=null;
+ if (allowedToRetry && retryWhileCME<10) {
+ log.info("Serializing sensors failed; will retry: "+e);
+ Time.sleep(100);
+ continue;
+ }
+ throw Exceptions.propagate(e);
+ }
+ }
+ return sizes;
+ }
+}