blob: 14e69e0951eae62127657bfab1a50596e6061b66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.workflow;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.collect.Iterables;
import com.google.common.reflect.TypeToken;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.effector.Effectors;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityAdjuncts;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.resolve.jackson.JsonPassThroughDeserializer;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.typereg.RegisteredTypes;
import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration;
import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors;
import org.apache.brooklyn.core.workflow.utils.WorkflowRetentionParser;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.BrooklynTypeNameResolution;
import org.apache.brooklyn.util.core.predicates.DslPredicates;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.TaskBuilder;
import org.apache.brooklyn.util.core.task.TaskTags;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.core.text.TemplateProcessor;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.javalang.Threads;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.brooklyn.core.workflow.WorkflowReplayUtils.ReplayResumeDepthCheck.RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class WorkflowExecutionContext {
private static final Logger log = LoggerFactory.getLogger(WorkflowExecutionContext.class);
public static final String LABEL_FOR_ERROR_HANDLER = "error-handler";
public static final int STEP_INDEX_FOR_START = -1;
public static final int STEP_INDEX_FOR_END = -2;
public static final int STEP_INDEX_FOR_ERROR_HANDLER = -3;
public static final String STEP_TARGET_NAME_FOR_START = "start";
public static final String STEP_TARGET_NAME_FOR_END = "end";
public static final String STEP_TARGET_NAME_FOR_LAST = "last";
public static final String STEP_TARGET_NAME_FOR_HERE = "here";
public static final String STEP_TARGET_NAME_FOR_EXIT = "exit";
public static final String STEP_TARGET_NAME_FOR_DEFAULT = "default";
public static final Map<String, Function<WorkflowExecutionContext,Integer>> PREDEFINED_NEXT_TARGETS = MutableMap.<String, Function<WorkflowExecutionContext,Integer>>of(
STEP_TARGET_NAME_FOR_START, c -> c==null? STEP_INDEX_FOR_START : 0,
STEP_TARGET_NAME_FOR_END, c -> c==null ? STEP_INDEX_FOR_END : c.stepsDefinition.size(),
STEP_TARGET_NAME_FOR_LAST, c -> c==null ? null : c.replayableLastStep,
STEP_TARGET_NAME_FOR_HERE, c -> c==null ? null : null,
STEP_TARGET_NAME_FOR_EXIT, c -> c==null ? null : null,
STEP_TARGET_NAME_FOR_DEFAULT, c -> c==null ? null : c.currentStepIndex+1).asUnmodifiable();
String name;
@Nullable BrooklynObject adjunct;
Entity entity;
public enum WorkflowStatus {
STAGED(false, false, false, false),
RUNNING(true, false, false, false),
SUCCESS(true, true, false, true),
/** useful information, usually cannot persisted by the time we've set this the first time, but could set on rebind */ ERROR_SHUTDOWN(true, true, true, false),
/** task failed because entity destroyed */ ERROR_ENTITY_DESTROYED(true, true, true, true),
/** task cancelled, timeout, or other interrupt, usually recursively (but not entity destroyed or server shutdown) */ ERROR_CANCELLED(true, true, true, true),
/** any other error, e.g. workflow step failed or data not immediately available (the interrupt used internally is not relevant) */ ERROR(true, true, true, true);
public final boolean started;
public final boolean ended;
public final boolean error;
public final boolean expirable;
WorkflowStatus(boolean started, boolean ended, boolean error, boolean expirable) { this.started = started; this.ended = ended; this.error = error; this.expirable = expirable; }
}
WorkflowStatus status;
Instant lastStatusChangeTime;
@JsonIgnore private transient WorkflowExecutionContext parent;
private BrooklynTaskTags.WorkflowTaskTag parentTag;
// should be treated as raw json
@JsonDeserialize(contentUsing = JsonPassThroughDeserializer.class)
List<Object> stepsDefinition;
DslPredicates.DslPredicate condition;
@JsonInclude(JsonInclude.Include.NON_EMPTY)
Map<String,Object> input = MutableMap.of();
@JsonIgnore // persist as sensor but not via REST in case it has secrets resolved
Map<String,Object> inputResolved = MutableMap.of();
Object outputDefinition;
Object output;
Object lock;
Duration timeout;
Object onError;
String workflowId;
/** current or most recent executing task created for this workflow, corresponding to task */
String taskId;
transient Task<Object> task;
@JsonProperty("retention")
WorkflowRetentionAndExpiration.WorkflowRetentionSettings retention;
/** all tasks created for this workflow */
Set<WorkflowReplayUtils.WorkflowReplayRecord> replays = MutableSet.of();
transient WorkflowReplayUtils.WorkflowReplayRecord replayCurrent = null;
/** null if no replay point, otherwise step number of the last completed replay point, or -1 if should replay from start and -2 if completed successfully */
Integer replayableLastStep;
Boolean replayableFromStart;
Boolean replayableAutomatically;
Boolean replayableDisabled;
Boolean idempotentAll;
Integer currentStepIndex;
Integer previousStepIndex;
String previousStepTaskId;
WorkflowStepInstanceExecutionContext currentStepInstance;
/** set if an error handler is the last thing which ran */
String errorHandlerTaskId;
/** set for the last _step_ inside the error handler */
WorkflowStepInstanceExecutionContext errorHandlerContext;
Map<Integer, OldStepRecord> oldStepInfo = MutableMap.of();
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public static class OldStepRecord {
/** count of runs started */
int countStarted = 0;
/** count of runs completed */
int countCompleted = 0;
/** context for last _completed_ instance of step */
WorkflowStepInstanceExecutionContext context;
/** is step replayable */
Boolean replayableFromHere;
/** scratch for last _started_ instance of step */
Map<String,Object> workflowScratch;
/** steps that immediately preceded this, updated when _this_ step started, with most recent first */
Set<Integer> previous;
/** steps that immediately followed this, updated when _next_ step started, with most recent first */
Set<Integer> next;
String previousTaskId;
String nextTaskId;
}
Map<String,Object> workflowScratchVariables = MutableMap.of();
@JsonInclude(JsonInclude.Include.NON_EMPTY)
Map<String,List<Instant>> retryRecords = MutableMap.of();
// deserialization constructor
private WorkflowExecutionContext() {}
public static WorkflowExecutionContext newInstancePersisted(BrooklynObject entityOrAdjunctWhereRunning, WorkflowContextType wcType, String name, ConfigBag paramsDefiningWorkflow,
Collection<ConfigKey<?>> extraConfigKeys, ConfigBag extraInputs, Map<String, Object> optionalTaskFlags) {
WorkflowExecutionContext w = newInstanceUnpersistedWithParent(entityOrAdjunctWhereRunning, null, wcType, name, paramsDefiningWorkflow, extraConfigKeys, extraInputs, optionalTaskFlags);
w.persist();
return w;
}
public enum WorkflowContextType {
SENSOR, EFFECTOR, POLICY, NESTED_WORKFLOW, OTHER
}
public static WorkflowExecutionContext newInstanceUnpersistedWithParent(BrooklynObject entityOrAdjunctWhereRunning, WorkflowExecutionContext parent,
WorkflowContextType wcType, String name, ConfigBag paramsDefiningWorkflow,
Collection<ConfigKey<?>> extraConfigKeys, ConfigBag extraInputs, Map<String, Object> optionalTaskFlags) {
// parameter defs
Map<String,ConfigKey<?>> parameters = MutableMap.of();
Maybe<BrooklynClassLoadingContext> loader = RegisteredTypes.getClassLoadingContextMaybe(entityOrAdjunctWhereRunning);
Effectors.parseParameters(paramsDefiningWorkflow.get(WorkflowCommonConfig.PARAMETER_DEFS), loader.orNull()).forEach(p -> parameters.put(p.getName(), Effectors.asConfigKey(p)));
if (extraConfigKeys!=null) extraConfigKeys.forEach(p -> parameters.put(p.getName(), p));
// inputs, unresolved first
ConfigBag inputRaw = ConfigBag.newInstance();
inputRaw.putAll(paramsDefiningWorkflow.get(WorkflowCommonConfig.INPUT));
if (extraInputs!=null) inputRaw.putAll(extraInputs.getAllConfig());
parameters.values().forEach(p -> {
if (p.hasDefaultValue() && !inputRaw.containsKey(p.getName())) inputRaw.put((ConfigKey)p, p.getDefaultValue());
});
MutableMap<String,Object> input = MutableMap.of();
inputRaw.forEach( (k,v) -> {
ConfigKey<?> kc = parameters.get(k);
// coerce, but don't freemarker resolve inputs because that's the job of the caller (e.g. in nested workflow, inputs resolved relative to calling workflow)
Object v2 = kc == null ? v : inputRaw.get(kc);
input.put(k, v2);
});
WorkflowExecutionContext w = new WorkflowExecutionContext(entityOrAdjunctWhereRunning, parent, name,
paramsDefiningWorkflow.get(WorkflowCommonConfig.STEPS),
input,
paramsDefiningWorkflow.get(WorkflowCommonConfig.OUTPUT),
WorkflowReplayUtils.updaterForReplayableAtWorkflow(paramsDefiningWorkflow, wcType == WorkflowContextType.NESTED_WORKFLOW),
optionalTaskFlags);
w.getStepsResolved(); // ensure steps resolve at this point; should be true even if condition doesn't apply (though input might not be valid without condition)
w.retention = WorkflowRetentionParser.parse(paramsDefiningWorkflow.get(WorkflowCommonConfig.RETENTION), w).init(w);
w.lock = paramsDefiningWorkflow.get(WorkflowCommonConfig.LOCK);
w.timeout = paramsDefiningWorkflow.get(WorkflowCommonConfig.TIMEOUT);
w.onError = paramsDefiningWorkflow.get(WorkflowCommonConfig.ON_ERROR);
// fail fast if error steps not resolveable
WorkflowStepResolution.resolveSubSteps(w.getManagementContext(), "error handling", WorkflowErrorHandling.wrappedInListIfNecessaryOrNullIfEmpty(w.onError));
// some fields need to be resolved at setting time, in the context of the workflow
w.setCondition(w.resolveWrapped(WorkflowExpressionResolution.WorkflowExpressionStage.WORKFLOW_STARTING_POST_INPUT, paramsDefiningWorkflow.getStringKey(WorkflowCommonConfig.CONDITION.getName()), WorkflowCommonConfig.CONDITION.getTypeToken()));
// finished -- checkpoint noting this has been created but not yet started
w.updateStatus(WorkflowStatus.STAGED);
return w;
}
protected WorkflowExecutionContext(BrooklynObject entityOrAdjunctWhereRunning, WorkflowExecutionContext parent, String name,
List<Object> stepsDefinition, Map<String,Object> input, Object output,
Consumer<WorkflowExecutionContext> replayableInitializer, Map<String, Object> optionalTaskFlags) {
initParent(parent);
this.name = name;
this.adjunct = entityOrAdjunctWhereRunning instanceof Entity ? null : entityOrAdjunctWhereRunning;
this.entity = entityOrAdjunctWhereRunning instanceof Entity ? (Entity)entityOrAdjunctWhereRunning : ((EntityAdjuncts.EntityAdjunctProxyable)entityOrAdjunctWhereRunning).getEntity();
this.stepsDefinition = stepsDefinition;
this.input = input;
this.outputDefinition = output;
if (replayableInitializer!=null) replayableInitializer.accept(this);
TaskBuilder<Object> tb = Tasks.builder().dynamic(true);
if (optionalTaskFlags!=null) tb.flags(optionalTaskFlags);
if (Strings.isBlank(tb.getDisplayName())) tb.displayName(name);
task = tb.body(new Body()).build();
WorkflowReplayUtils.updateOnWorkflowStartOrReplay(this, task, "initial run", null);
workflowId = taskId = task.getId();
TaskTags.addTagDynamically(task, BrooklynTaskTags.WORKFLOW_TAG);
TaskTags.addTagDynamically(task, BrooklynTaskTags.tagForWorkflow(this));
// currently workflow ID is the same as the task ID assigned initially. (but if replayed they will be different.)
// there is no deep reason or need for this, it is just convenient, and used for tests.
//this.workflowId = Identifiers.makeRandomId(8);
}
public void initParent(WorkflowExecutionContext parent) {
this.parent = parent;
this.parentTag = parent==null ? null : BrooklynTaskTags.tagForWorkflow(parent);
}
@JsonIgnore
public WorkflowExecutionContext getParent() {
if (parent==null && parentTag!=null) {
Entity entity = getManagementContext().getEntityManager().getEntity(parentTag.getEntityId());
if (entity==null) {
log.warn("Parent workflow "+parentTag+" for "+this+" is on an entity no longer known; unparenting this workflow");
parentTag = null;
} else {
parent = new WorkflowStatePersistenceViaSensors(getManagementContext()).getWorkflows(entity).get(parentTag.getWorkflowId());
if (parent==null) {
log.warn("Parent workflow "+parentTag+" for "+this+" is no longer known; unparenting this workflow");
parentTag = null;
}
}
}
return parent;
}
public static void validateSteps(ManagementContext mgmt, List<WorkflowStepDefinition> steps, boolean alreadyValidatedIndividualSteps) {
if (!alreadyValidatedIndividualSteps) {
steps.forEach(w -> w.validateStep(mgmt, null));
}
computeStepsWithExplicitIdById(steps);
}
static Map<String,Pair<Integer,WorkflowStepDefinition>> computeStepsWithExplicitIdById(List<WorkflowStepDefinition> steps) {
Map<String,Pair<Integer,WorkflowStepDefinition>> stepsWithExplicitId = MutableMap.of();
for (int i = 0; i<steps.size(); i++) {
WorkflowStepDefinition s = steps.get(i);
if (s.id != null) {
if (PREDEFINED_NEXT_TARGETS.containsKey(s.id.toLowerCase()))
throw new IllegalStateException("Token '" + s + "' is a reserved word and cannot be used as a step ID");
Pair<Integer, WorkflowStepDefinition> old = stepsWithExplicitId.put(s.id, Pair.of(i, s));
if (old != null) throw new IllegalStateException("Same step ID '" + s + "' used for multiple steps ("+(old.getLeft()+1)+" and "+(i+1)+")");
}
}
return stepsWithExplicitId;
}
public void setCondition(DslPredicates.DslPredicate condition) {
this.condition = condition;
}
@Override
public String toString() {
return "Workflow<" + name + " - " + workflowId + ">";
}
@JsonIgnore
public BrooklynObject getEntityOrAdjunctWhereRunning() {
if (adjunct!=null) return adjunct;
return entity;
}
public BrooklynTaskTags.WorkflowTaskTag getParentTag() {
return parentTag;
}
public Map<String, Object> getWorkflowScratchVariables() {
return workflowScratchVariables;
}
public Map<String, List<Instant>> getRetryRecords() {
return retryRecords;
}
@JsonIgnore
public Object getConditionTarget() {
if (getWorkflowScratchVariables()!=null) {
Object v = getWorkflowScratchVariables().get("target");
// should we also set the entity? otherwise it will take from the task. but that should only apply
// in a task where the context entity is set, so for now rely on that.
if (v!=null) return v;
}
return getEntityOrAdjunctWhereRunning();
}
@JsonIgnore
public Maybe<Task<Object>> getTask(boolean checkCondition) {
if (checkCondition && condition!=null) {
if (!condition.apply(getConditionTarget())) return Maybe.absent(new IllegalStateException("This workflow cannot be run at present: condition not satisfied"));
}
if (task==null) {
if (taskId !=null) {
task = (Task<Object>) getManagementContext().getExecutionManager().getTask(taskId);
}
if (task==null) {
return Maybe.absent(new IllegalStateException("Task for "+this+" no longer available"));
}
}
return Maybe.of(task);
}
public Factory factory(boolean allowInternallyEvenIfDisabled) {
return new Factory(allowInternallyEvenIfDisabled);
}
public class Factory {
private final boolean allowInternallyEvenIfDisabled;
protected Factory(boolean allowInternallyEvenIfDisabled) {
this.allowInternallyEvenIfDisabled = allowInternallyEvenIfDisabled;
}
public WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayingFromStep(int stepIndex0, String reason, boolean forced) {
if (!forced) checkNotDisabled();
int stepIndex = stepIndex0;
if (!forced) {
stepIndex = Maybe.ofDisallowingNull(WorkflowReplayUtils.findNearestReplayPoint(WorkflowExecutionContext.this, stepIndex0))
.orThrow(() -> new IllegalStateException("Workflow is not replayable: no replay points found backtracking from " + stepIndex0));
log.debug("Request to replay from step " + stepIndex0 + ", nearest replay point is " + stepIndex);
}
return new WorkflowStepDefinition.ReplayContinuationInstructions(stepIndex, reason, null, forced);
}
public WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayingFromLastReplayable(String reason, boolean forced) {
return makeInstructionsForReplayingFromStep(replayableLastStep != null ? replayableLastStep : STEP_INDEX_FOR_START, reason, forced);
}
public WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayingFromStart(String reason, boolean forced) {
return makeInstructionsForReplayingFromStep(STEP_INDEX_FOR_START, reason, forced);
}
public WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayResuming(String reason, boolean forced) {
return makeInstructionsForReplayResuming(reason, forced, null);
}
public WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayResumingForcedWithCustom(String reason, Runnable code) {
return makeInstructionsForReplayResuming(reason, true, code);
}
protected WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayResuming(String reason, boolean forced, Runnable code) {
if (!forced) checkNotDisabled();
Integer replayFromStep = null;
if (currentStepIndex == null) {
// not yet started
replayFromStep = STEP_INDEX_FOR_START;
} else if (currentStepInstance == null || currentStepInstance.stepIndex != currentStepIndex) {
// replaying from a different step, or current step which has either not run or completed but didn't save
log.debug("Replaying workflow '" + name + "', cannot replay within step " + currentStepIndex + " because step instance not known; will reinitialize then replay that step");
replayFromStep = currentStepIndex;
}
if (!forced && replayFromStep == null) {
// instructions should be made even if subworkflows might reject them; that's the intention of "resume" without force, vs replay from last
if (!WorkflowReplayUtils.isReplayResumable(WorkflowExecutionContext.this, RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT, allowInternallyEvenIfDisabled)) {
if (code != null) {
// we could allow this, but we don't need it
throw new IllegalArgumentException("Cannot supply code to here without forcing as workflow does not support replay resuming at this point");
}
log.debug("Request to replay resuming " + WorkflowExecutionContext.this + " at non-idempotent step; rolling back to " + replayableLastStep);
if (replayableLastStep == null) {
throw new IllegalArgumentException("Cannot replay resuming as there are no replay points and last step " + currentStepIndex + " is not idempotent");
}
return makeInstructionsForReplayingFromStep(replayableLastStep, reason, false);
}
}
return new WorkflowStepDefinition.ReplayContinuationInstructions(replayFromStep, reason, code, forced);
}
public Task<Object> createTaskReplaying(WorkflowStepDefinition.ReplayContinuationInstructions continuationInstructions) {
return createTaskReplaying(null, continuationInstructions);
}
public Task<Object> createTaskReplaying(Runnable intro, WorkflowStepDefinition.ReplayContinuationInstructions continuationInstructions) {
if (continuationInstructions==null || !continuationInstructions.forced) checkNotDisabled();
if (task != null && !task.isDone()) {
if (!task.isSubmitted()) {
if (parent!=null && parent.getReplays().size()>1) {
log.debug("Abandoning sub-workflow task that was never submitted, not unusual as parent seems to be replaying: " + task + " for " + WorkflowExecutionContext.this);
} else {
log.warn("Abandoning workflow task that was never submitted: " + task + " for " + WorkflowExecutionContext.this);
}
} else {
if (isSubmitterAncestor(Tasks.current(), task)) {
// not sure we need this check
log.debug("Replaying containing workflow " + WorkflowExecutionContext.this + " in task " + task + " which is an ancestor of " + Tasks.current());
} else {
log.warn("Unable to replay workflow " + WorkflowExecutionContext.this + " from " + Tasks.current() + " because workflow task " + task + " is ongoing; will delay up to 1s then retry");
// there can be a slight race between tasks ending and the workflow reporting a failure and replaying;
// esp in tests, but also in real world, forgive such a situation by delaying the replay for a short time
if (!task.blockUntilEnded(Duration.ONE_SECOND)) {
log.warn("Unable to replay workflow " + WorkflowExecutionContext.this + " from " + Tasks.current() + " because workflow task " + task + " is ongoing (waited 1s, still ongoing; so rethrowing)");
throw new IllegalStateException("Cannot replay ongoing workflow, given " + continuationInstructions);
}
}
}
}
String explanation = continuationInstructions.customBehaviorExplanation != null ? continuationInstructions.customBehaviorExplanation : "no explanation";
task = Tasks.builder().dynamic(true).displayName(name + " (" + explanation + ")")
.tag(BrooklynTaskTags.tagForWorkflow(WorkflowExecutionContext.this))
.tag(BrooklynTaskTags.WORKFLOW_TAG)
.body(new Body(continuationInstructions).withIntro(intro)).build();
WorkflowReplayUtils.updateOnWorkflowStartOrReplay(WorkflowExecutionContext.this, task, continuationInstructions.customBehaviorExplanation, continuationInstructions.stepToReplayFrom);
taskId = task.getId();
return task;
}
public boolean isDisabled() {
if (allowInternallyEvenIfDisabled) return false;
if (Boolean.TRUE.equals(replayableDisabled)) return true;
return false;
}
public void checkNotDisabled() {
if (isDisabled()) throw new IllegalStateException("Replays disabled on "+WorkflowExecutionContext.this);
}
}
private boolean isSubmitterAncestor(Task current, Task<Object> possibleAncestor) {
if (current==null) return false;
if (current.equals(possibleAncestor)) return true;
return isSubmitterAncestor(current.getSubmittedByTask(), possibleAncestor);
}
public Entity getEntity() {
return entity;
}
@JsonIgnore
public ManagementContext getManagementContext() {
return ((EntityInternal)getEntity()).getManagementContext();
}
@JsonIgnore
protected WorkflowStatePersistenceViaSensors getPersister() {
return new WorkflowStatePersistenceViaSensors(getManagementContext());
}
public void persist() {
if (isInErrorHandlerSubWorkflow()) {
// currently don't persist error handler sub-workflows
return;
}
getPersister().checkpoint(this);
}
/** Get the value of the input. Supports Brooklyn DSL resolution but NOT Freemarker resolution. */
public Object getInput(String key) {
return getInputMaybe(key, TypeToken.of(Object.class), Maybe.ofAllowingNull(null)).get();
}
public <T> Maybe<T> getInputMaybe(String key, TypeToken<T> type, Maybe<T> valueIfUndefined) {
if (!input.containsKey(key)) return valueIfUndefined;
if (inputResolved.containsKey(key)) return Maybe.ofAllowingNull((T)inputResolved.get(key));
Object v = input.get(key);
// normally do DSL resolution/coercion only, not workflow syntax here (as no workflow scope);
// except if we are in a nested workflow, we allow resolving from the parent.
// (alternatively we could resolve when starting the custom workflow; that might be better.)
Maybe<T> vm = null;
if (v instanceof String && parent!=null && parent.getCurrentStepInstance()!=null) {
try {
vm = Maybe.of(parent.getCurrentStepInstance().resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, (String) v, type));
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
vm = Maybe.absent(e);
}
}
if (vm==null || vm.isAbsent()) {
Maybe<T> vm2 = Tasks.resolving(v).as(type).context(getEntity()).immediately(true).deep().getMaybe();
if (vm2.isPresent() || vm==null) vm = vm2; // if errors in both, prefer error in first
}
if (vm.isPresent()) {
if (WorkflowStepInstanceExecutionContext.REMEMBER_RESOLVED_INPUT) {
// this will keep spending time resolving, but will resolve the resolved value
inputResolved.put(key, vm.get());
}
}
return vm;
}
public TypeToken<?> lookupType(String typeName, Supplier<TypeToken<?>> ifUnset) {
if (Strings.isBlank(typeName)) return ifUnset.get();
BrooklynClassLoadingContext loader = getEntity() != null ? RegisteredTypes.getClassLoadingContext(getEntity()) : null;
return new BrooklynTypeNameResolution.BrooklynTypeNameResolver("workflow", loader, true, true).getTypeToken(typeName);
}
/** as {@link #resolve(WorkflowExpressionResolution.WorkflowExpressionStage, Object, TypeToken)} but without type coercion */
public Object resolve(WorkflowExpressionResolution.WorkflowExpressionStage stage, String expression) {
return resolve(stage, expression, Object.class);
}
/** as {@link #resolve(WorkflowExpressionResolution.WorkflowExpressionStage, Object, TypeToken)} */
public <T> T resolve(WorkflowExpressionResolution.WorkflowExpressionStage stage, Object expression, Class<T> type) {
return resolve(stage, expression, TypeToken.of(type));
}
/** resolution of ${interpolation} and $brooklyn:dsl and deferred suppliers, followed by type coercion.
* if the type is a string, null is not permitted, otherwise it is. */
public <T> T resolve(WorkflowExpressionResolution.WorkflowExpressionStage stage, Object expression, TypeToken<T> type) {
return new WorkflowExpressionResolution(this, stage, false, false).resolveWithTemplates(expression, type);
}
public <T> T resolveCoercingOnly(WorkflowExpressionResolution.WorkflowExpressionStage stage, Object expression, TypeToken<T> type) {
return new WorkflowExpressionResolution(this, stage, false, false).resolveCoercingOnly(expression, type);
}
/** as {@link #resolve(WorkflowExpressionResolution.WorkflowExpressionStage, Object, TypeToken)}, but returning DSL/supplier for values (so the indication of their dynamic nature is preserved, even if the value returned by it is resolved;
* this is needed e.g. for conditions which treat dynamic expressions differently to explicit values) */
public <T> T resolveWrapped(WorkflowExpressionResolution.WorkflowExpressionStage stage, Object expression, TypeToken<T> type) {
return new WorkflowExpressionResolution(this, stage, false, true).resolveWithTemplates(expression, type);
}
/** as {@link #resolve(WorkflowExpressionResolution.WorkflowExpressionStage, Object, TypeToken)}, but waiting on any expressions which aren't ready */
public <T> T resolveWaiting(WorkflowExpressionResolution.WorkflowExpressionStage stage, Object expression, TypeToken<T> type) {
return new WorkflowExpressionResolution(this, stage, true, false).resolveWithTemplates(expression, type);
}
/** resolution of ${interpolation} and $brooklyn:dsl and deferred suppliers, followed by type coercion */
public <T> T resolveConfig(WorkflowExpressionResolution.WorkflowExpressionStage stage, ConfigBag config, ConfigKey<T> key) {
Object v = config.getStringKey(key.getName());
if (v==null) return null;
return resolve(stage, v, key.getTypeToken());
}
public WorkflowStatus getStatus() {
return status;
}
void updateStatus(WorkflowStatus newStatus) {
status = newStatus;
lastStatusChangeTime = Instant.now();
}
@JsonIgnore
public Instant getLastStatusChangeTime() {
return lastStatusChangeTime;
}
public Integer getCurrentStepIndex() {
return currentStepIndex;
}
public WorkflowStepInstanceExecutionContext getCurrentStepInstance() {
return currentStepInstance;
}
public Integer getPreviousStepIndex() {
return previousStepIndex;
}
// clear this when error handler is not running
transient Object lastErrorHandlerOutput = null;
@JsonIgnore
public Object getPreviousStepOutput() {
if (lastErrorHandlerOutput!=null) return lastErrorHandlerOutput;
if (previousStepIndex==null) return null;
OldStepRecord last = oldStepInfo.get(previousStepIndex);
if (last!=null && last.context!=null) return last.context.output;
return null;
}
public Object getOutput() {
return output;
}
public String getName() {
return name;
}
public String getWorkflowId() {
return workflowId;
}
public String getTaskId() {
return taskId;
}
public Set<WorkflowReplayUtils.WorkflowReplayRecord> getReplays() {
return replays;
}
public Integer getReplayableLastStep() {
return replayableLastStep;
}
public WorkflowStepInstanceExecutionContext getErrorHandlerContext() {
return errorHandlerContext;
}
@JsonIgnore
public String getRetentionHash() {
if (retention!=null && Strings.isNonBlank(retention.hash)) return retention.hash;
if (Strings.isNonBlank(getName())) return getName();
return "anonymous-workflow-"+Math.abs(getStepsDefinition().hashCode());
}
public void updateRetentionFrom(WorkflowRetentionAndExpiration.WorkflowRetentionSettings other) {
WorkflowRetentionAndExpiration.WorkflowRetentionSettings r = getRetentionSettings();
r.updateFrom(other);
retention = r;
retentionDefault = null;
}
@JsonIgnore private transient WorkflowRetentionAndExpiration.WorkflowRetentionSettings retentionDefault;
@JsonIgnore
public WorkflowRetentionAndExpiration.WorkflowRetentionSettings getRetentionSettings() {
if (retention==null) {
if (retentionDefault==null) {
retentionDefault = new WorkflowRetentionAndExpiration.WorkflowRetentionSettings().init(this);
}
return retentionDefault;
}
return retention;
}
public void markShutdown() {
log.debug(this+" was "+this.status+" but now marking as "+WorkflowStatus.ERROR_SHUTDOWN+"; compensating workflow should be triggered shortly");
this.updateStatus(WorkflowStatus.ERROR_SHUTDOWN);
// don't persist; that will happen when workflows are kicked off
}
@JsonIgnore
/** Error handlers _could_ launch sub-workflow, but they typically don't */
protected boolean isInErrorHandlerSubWorkflow() {
if (getParent()!=null) {
if (getParent().getErrorHandlerContext()!=null) {
return true;
}
return getParent().isInErrorHandlerSubWorkflow();
}
return false;
}
/** look in tasks, steps, and replays to find most recent activity */
// keep on jackson serialization for api?
public long getMostRecentActivityTime() {
AtomicLong result = new AtomicLong(-1);
Consumer<Long> consider = l -> {
if (l!=null && l>result.get()) result.set(l);
};
Consumer<Task> considerTask = task -> {
if (task!=null) {
consider.accept(task.getEndTimeUtc());
consider.accept(task.getStartTimeUtc());
consider.accept(task.getSubmitTimeUtc());
}
};
considerTask.accept(getTask(false).orNull());
Consumer<WorkflowReplayUtils.WorkflowReplayRecord> considerReplay = replay -> {
if (replay!=null) {
consider.accept(replay.endTimeUtc);
consider.accept(replay.startTimeUtc);
consider.accept(replay.submitTimeUtc);
}
};
if (replayCurrent!=null) {
considerReplay.accept(replayCurrent);
} else if (!replays.isEmpty()) {
considerReplay.accept(Iterables.getLast(replays));
}
if (currentStepInstance!=null) {
considerTask.accept(getManagementContext().getExecutionManager().getTask(currentStepInstance.getTaskId()));
}
return result.get();
}
public List<Object> getStepsDefinition() {
return MutableList.copyOf(stepsDefinition).asUnmodifiable();
}
transient Map<String,Pair<Integer,WorkflowStepDefinition>> stepsWithExplicitId;
transient List<WorkflowStepDefinition> stepsResolved;
@JsonIgnore
List<WorkflowStepDefinition> getStepsResolved() {
if (stepsResolved ==null) {
stepsResolved = MutableList.copyOf(WorkflowStepResolution.resolveSteps(getManagementContext(), WorkflowExecutionContext.this.stepsDefinition));
}
return stepsResolved;
}
@JsonIgnore
public Map<String, Pair<Integer,WorkflowStepDefinition>> getStepsWithExplicitIdById() {
if (stepsWithExplicitId ==null) stepsWithExplicitId = computeStepsWithExplicitIdById(getStepsResolved());
return stepsWithExplicitId;
}
protected class Body implements Callable<Object> {
private WorkflowStepDefinition.ReplayContinuationInstructions continuationInstructions;
private Runnable intro = null;
private int stepsRun = 0;
public Body() {}
public Body(WorkflowStepDefinition.ReplayContinuationInstructions continuationInstructions) {
this.continuationInstructions = continuationInstructions;
}
@Override
public String toString() {
return "WorkflowExecutionContext.Body["+workflowId+"; " + continuationInstructions + "]";
}
@Override
public Object call() throws Exception {
if (intro!=null) {
// intro needed to make dangling resumption wait
intro.run();
}
return callWithLock(this::callSteps);
}
protected Object callWithLock(Callable<Callable<Object>> handler) throws Exception {
AttributeSensor<String> lockSensor0 = null;
Entity lockEntity0 = null;
if (lock != null) {
String lockName = null;
if (lock instanceof String) lockName = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.WORKFLOW_INPUT, lock, TypeToken.of(String.class));
else if (lock instanceof Map) {
lockName = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.WORKFLOW_INPUT, ((Map)lock).get("name"), TypeToken.of(String.class));
Object lockEntity00 = ((Map)lock).get("entity");
if (lockEntity00!=null) {
lockEntity0 = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.WORKFLOW_INPUT, lockEntity00, TypeToken.of(Entity.class));
log.debug(WorkflowExecutionContext.this + " using lock " + lockName + " on entity " + lockEntity0);
}
}
if (lockName==null) throw new IllegalArgumentException("Invalid lock object, should be a string or a map indicating a name and optional entity");
lockSensor0 = Sensors.newStringSensor("lock-for-" + lockName);
if (lockEntity0==null) lockEntity0 = getEntity();
}
AttributeSensor<String> lockSensor = lockSensor0;
Entity lockEntity = lockEntity0;
AtomicBoolean mustClearLock = new AtomicBoolean(false);
if (lockSensor!=null) {
// acquire lock
// - step: set-sensor lock-for-count = ${workflow.id}
// require:
// any:
// - when: absent
// - equals: ${workflow.id}
// on-error:
// - retry backoff 50ms increasing 2x up to 5s
String wid = getWorkflowId();
Duration delay = null;
String lastHolder = null;
while (true) {
AtomicReference<String> holder = new AtomicReference<>();
lockEntity.sensors().modify(lockSensor, old -> {
if (old == null || old.equals(wid)) {
if (old != null) {
if (continuationInstructions != null) {
log.debug(WorkflowExecutionContext.this+" reasserting lock on " + lockSensor.getName() + " during replay");
} else {
// i don't think this should be possible
log.warn("Entering block with lock on " + lockSensor.getName() + " when this workflow already holds the lock");
}
} else {
log.debug(WorkflowExecutionContext.this+" acquired lock on " + lockSensor.getName());
}
mustClearLock.set(true);
return Maybe.of(wid);
}
log.debug("Blocked by lock on " + lockSensor.getName() + ", currently held by " + old);
holder.set(old);
return Maybe.absent();
});
if (mustClearLock.get()) break;
// didn't get lock; probably do a retry
try {
if (delay==null || !Objects.equals(lastHolder, holder.get())) {
// reset initially, and if we observe the lock holder to change (highly competitive environment)
delay = Duration.millis(5);
}
Duration ddelay = delay;
Tasks.withBlockingDetails("Waiting for lock on " + lockSensor.getName() +" (held by workflow "+holder.get()+")", () -> { Time.sleep(ddelay); return null; });
// increment delay for next time
delay = Duration.max(delay.multiply(1 + Math.random()), Duration.seconds(5));
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
}
Callable<Object> endHandler;
try {
endHandler = handler.call(); //super.doTaskBodyPossiblyWithLock(context, handler, isReplaying, continuationInstructions);
} finally {
// clear if we set it, whether initially, on real replay, or on injected dangling failure
if (mustClearLock.get()) {
try {
DynamicTasks.waitForLast();
} finally {
if (Entities.isUnmanagingOrNoLongerManaged(getEntity())) {
log.debug("Skipping clearance of lock on "+lockSensor.getName()+" in "+WorkflowExecutionContext.this+" because entity unmanaging here; expect auto-replay on resumption to pick up");
} else {
Threads.runTemporarilyUninterrupted(() -> {
log.debug(WorkflowExecutionContext.this + " releasing lock on " + lockSensor.getName());
((EntityInternal.SensorSupportInternal) lockEntity.sensors()).remove(lockSensor);
});
}
}
}
}
return endHandler.call();
}
boolean continueOnErrorHandledOrNextReplay;
AtomicReference<Boolean> timerCancelled;
public Callable<Object> callSteps() throws Exception {
DynamicTasks.swallowChildrenFailures();
Task<?> timerTask = null;
timerCancelled = new AtomicReference<>(false);
try {
if (timeout != null) {
timerTask = initializeTimerFromWorkflowTimeout(timerTask);
}
RecoveryAndReplay: do {
try {
boolean replaying = continuationInstructions!=null;
Integer replayFromStep = replaying ? continuationInstructions.stepToReplayFrom : null;
if (!replaying) initializeWithoutContinuationInstructions(replayFromStep);
continueOnErrorHandledOrNextReplay = false;
lastErrorHandlerOutput = null;
WorkflowReplayUtils.updateOnWorkflowTaskStartupOrReplay(WorkflowExecutionContext.this, task, getStepsResolved(), !replaying, replayFromStep);
// show task running
updateStatus(WorkflowStatus.RUNNING);
if (replaying) initializeFromContinuationInstructions(replayFromStep);
if (!Objects.equals(taskId, Tasks.current().getId()))
throw new IllegalStateException("Running workflow in unexpected task, " + taskId + " does not match " + task);
int stepsConsidered = 0;
while (currentStepIndex >= 0 && currentStepIndex < getStepsResolved().size()) {
stepsConsidered++;
if (replaying && replayFromStep == null) {
// check step number and clear output before re-running
if (currentStepInstance==null || currentStepInstance.getStepIndex()!=currentStepIndex) {
throw new IllegalStateException("Running workflow at unexpected step, "+currentStepIndex+" v "+currentStepInstance);
}
currentStepInstance.output = null;
currentStepInstance.injectContext(WorkflowExecutionContext.this);
log.debug("Replaying workflow '" + name + "', reusing instance " + currentStepInstance + " for step " + workflowStepReference(currentStepIndex) + ")");
runCurrentStepInstanceApproved(getStepsResolved().get(currentStepIndex));
} else {
runCurrentStepIfPreconditions();
}
replaying = false;
if (continuationInstructions!=null) {
continueOnErrorHandledOrNextReplay = true;
continue RecoveryAndReplay;
}
}
log.debug("Completed workflow "+workflowId+" successfully; step count: "+stepsConsidered+" considered, "+stepsRun+" executed");
if (outputDefinition != null) {
output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, outputDefinition, Object.class);
} else {
if (stepsRun>0) {
// (default is the output of the last step, if there is one, otherwise nothing)
// ((unlike steps, workflow output is not available as a default value, but previous step always is, so there is no need to do this
// before the above; slight chance if onError is triggered by a failure to resolve something in outputDefinition, but that can be ignored))
output = getPreviousStepOutput();
}
}
updateOnSuccessfulCompletion();
} catch (Throwable e) {
try {
Pair<Throwable, WorkflowStatus> unhandledError = handleErrorAtWorkflow(e);
if (unhandledError != null) {
return () -> endWithError(unhandledError.getLeft(), unhandledError.getRight());
}
} catch (Throwable e2) {
// do not propagateIfFatal, we need to handle most throwables
log.debug("Uncaught error in workflow exception handler: "+ e2, e2);
return () -> endWithError(e2, WorkflowStatus.ERROR);
}
}
} while (continueOnErrorHandledOrNextReplay);
} finally {
if (timerTask != null && !timerTask.isDone() && !timerCancelled.get()) {
log.debug("Cancelling " + timerTask + " on completion of this task");
timerTask.cancel(true);
}
}
return this::endWithSuccess;
}
private Pair<Throwable, WorkflowStatus> handleErrorAtWorkflow(Throwable e) {
boolean isTimeout = false;
if (timerCancelled.get()) {
if (Exceptions.getCausalChain(e).stream().anyMatch(cause -> cause instanceof TimeoutException || cause instanceof InterruptedException || cause instanceof CancellationException || cause instanceof RuntimeInterruptedException)) {
// timed out, and a cause is related to cancellation
// NOTE: this just gets used for logging, it is not ever stored by the calling task or returned to a user;
// when the task is cancelled, the only information available to callers is that it was cancelled
// (we could add indications of why but that is a lot of work, and not clear we need it)
TimeoutException timeoutException = new TimeoutException("Timeout after " + timeout + ": " + getName());
timeoutException.initCause(e);
e = timeoutException;
isTimeout = true;
}
}
WorkflowStatus provisionalStatus;
if (Exceptions.isCausedByInterruptInAnyThread(e) || Exceptions.getFirstThrowableMatching(e,
// could do with a more precise check that workflow is cancelled or has timed out; exceptions could come from other unrelated causes (but unlikely)
t -> t instanceof CancellationException || t instanceof TimeoutException) != null) {
if (!Thread.currentThread().isInterrupted()) {
// might be a data model error
if (Exceptions.getFirstThrowableOfType(e, TemplateProcessor.TemplateModelDataUnavailableException.class) != null) {
provisionalStatus = WorkflowStatus.ERROR;
} else {
// cancelled or a subtask interrupted
provisionalStatus = WorkflowStatus.ERROR_CANCELLED;
}
} else {
provisionalStatus = WorkflowStatus.ERROR_CANCELLED;
}
if (provisionalStatus == WorkflowStatus.ERROR_CANCELLED) {
if (!getManagementContext().isRunning()) {
// if mgmt is shutting down we should record that. maybe enough if thread is interrupted we note that.
provisionalStatus = WorkflowStatus.ERROR_SHUTDOWN;
} else if (Entities.isUnmanagingOrNoLongerManaged(entity)) {
provisionalStatus = WorkflowStatus.ERROR_ENTITY_DESTROYED;
}
}
} else {
provisionalStatus = WorkflowStatus.ERROR;
}
boolean errorHandled = false;
if (isTimeout) {
// don't run error handler
log.debug("Timeout in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", throwing: " + Exceptions.collapseText(e));
} else if (Thread.currentThread().isInterrupted()) {
// don't run error handler
log.debug("Interrupt in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", throwing: " + Exceptions.collapseText(e));
} else if (onError != null && (!(onError instanceof Collection) || !((Collection)onError).isEmpty())) {
try {
log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", running error handler");
Task<WorkflowErrorHandling.WorkflowErrorHandlingResult> workflowErrorHandlerTask = WorkflowErrorHandling.createWorkflowErrorHandlerTask(WorkflowExecutionContext.this, task, e);
errorHandlerTaskId = workflowErrorHandlerTask.getId();
WorkflowErrorHandling.WorkflowErrorHandlingResult result = DynamicTasks.queue(workflowErrorHandlerTask).getUnchecked();
if (result != null) {
errorHandled = true;
currentStepInstance.next = WorkflowReplayUtils.getNext(result.next, STEP_TARGET_NAME_FOR_END);
if (result.output != null) output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, result.output, Object.class);
moveToNextStep("Handled error in workflow around step " + workflowStepReference(currentStepIndex), result.next==null);
if (continuationInstructions!=null || currentStepIndex < getStepsResolved().size()) {
continueOnErrorHandledOrNextReplay = true;
return null;
}
} // else errorHandled remains false and will fail below
} catch (Exception e2) {
Throwable e0 = e;
if (Exceptions.getCausalChain(e2).stream().anyMatch(e3 -> e3==e0)) {
// wraps/rethrows original, don't need to log, but do return the new one
e = e2;
// } else if (Exceptions.isCausedByInterruptInAnyThread(e) && Exceptions.isCausedByInterruptInAnyThread(e2)) {
// // now handled above
// // if both are interrupted we can drop the trace, and return original;
// log.debug("Error where error handler was interrupted, after main thread was also interrupted: " + e2);
// log.trace("Full trace of original error was: " + e, e);
} else {
log.warn("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " error handler for -- " + Exceptions.collapseText(e) + " -- threw another error (rethrowing): " + Exceptions.collapseText(e2));
log.debug("Full trace of original error was: " + e, e);
e = e2;
}
}
} else {
log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", no error handler so rethrowing: " + Exceptions.collapseText(e));
}
if (errorHandled) return null;
if (replayAutomaticallyIfAppropriate(e)) return null;
return Pair.of(e, provisionalStatus);
}
private boolean replayAutomaticallyIfAppropriate(Throwable e) {
if (Boolean.TRUE.equals(replayableAutomatically) && Exceptions.getFirstThrowableOfType(e, DanglingWorkflowException.class)!=null) {
log.info("Automatic replay indicated for "+WorkflowExecutionContext.this+" when detected as dangling on server startup");
currentStepInstance.next = factory(true).makeInstructionsForReplayResuming("Replay resuming on dangling", false);
continueOnErrorHandledOrNextReplay = true;
return true;
}
return false;
}
private void updateOnSuccessfulCompletion() {
// finished -- checkpoint noting previous step and null for current because finished
updateStatus(WorkflowStatus.SUCCESS);
replayableLastStep = STEP_INDEX_FOR_END;
// record how it ended
oldStepInfo.compute(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex, (index, old) -> {
if (old == null) old = new OldStepRecord();
old.next = MutableSet.<Integer>of(STEP_INDEX_FOR_END).putAll(old.next);
old.nextTaskId = null;
return old;
});
oldStepInfo.compute(STEP_INDEX_FOR_END, (index, old) -> {
if (old == null) old = new OldStepRecord();
old.previous = MutableSet.<Integer>of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous);
old.previousTaskId = previousStepTaskId;
return old;
});
}
private void resetWorkflowContextPreviousAndScratchVarsToStep(Integer step, boolean requireLastStep) {
if (step==null) {
// when resuming, keep them as they were set
return;
}
OldStepRecord last = oldStepInfo.get(step);
if (last != null) {
workflowScratchVariables = last.workflowScratch;
previousStepIndex = last.previous==null ? null : last.previous.stream().findFirst().orElse(null);
} else {
if (requireLastStep) {
throw new IllegalStateException("Last step record required for step "+step+" to replay from there");
} else {
// no such step; probably starting something which has never been run (not uncommon), or at a step which was never run (not sure if possible);
// in any case just keep the scratch vars as they were
}
}
// and ensure not null
if (workflowScratchVariables == null) workflowScratchVariables = MutableMap.of();
}
private void initializeFromContinuationInstructions(Integer replayFromStep) {
if (replayFromStep != null && replayFromStep == STEP_INDEX_FOR_START) {
log.debug("Replaying workflow '" + name + "', from start " +
"(was at " + (currentStepIndex == null ? "<UNSTARTED>" : workflowStepReference(currentStepIndex)) + ")");
resetWorkflowContextPreviousAndScratchVarsToStep(replayFromStep, false);
currentStepIndex = 0;
} else if (replayFromStep != null && replayFromStep == STEP_INDEX_FOR_END) {
log.debug("Replaying workflow '" + name + "', from end " +
"(was at " + (currentStepIndex == null ? "<UNSTARTED>" : workflowStepReference(currentStepIndex)) + ")");
currentStepIndex = STEP_INDEX_FOR_END;
resetWorkflowContextPreviousAndScratchVarsToStep(replayFromStep, false);
currentStepInstance = null;
} else {
// replaying workflow
log.debug("Replaying workflow '" + name + "', from step " + (replayFromStep == null ? "<CURRENT>" : workflowStepReference(replayFromStep)) +
" (was at " + (currentStepIndex == null ? "<UNSTARTED>" : workflowStepReference(currentStepIndex)) + ")");
if (replayFromStep == null) {
// replayFromLast should correct the cases below
if (currentStepIndex == null) {
throw new IllegalStateException("Invalid instructions to continue from last bypassing convenience method, and there is no last");
} else if (currentStepInstance == null || currentStepInstance.stepIndex != currentStepIndex) {
throw new IllegalStateException("Invalid instructions to continue from last step which is unknown, bypassing convenience method");
}
} else {
currentStepIndex = replayFromStep;
}
// must reset, but okay if null, in that case we are continuing to a step which hasn't been initialized yet; use scratch vars
resetWorkflowContextPreviousAndScratchVarsToStep(currentStepIndex, false);
}
if (continuationInstructions.customWorkflowScratchVariables!=null) {
workflowScratchVariables.putAll(continuationInstructions.customWorkflowScratchVariables);
}
}
private void initializeWithoutContinuationInstructions(Integer replayFromStep) {
if (replayFromStep == null && currentStepIndex == null) {
currentStepIndex = 0;
log.debug("Starting workflow '" + name + "', moving to first step " + workflowStepReference(currentStepIndex));
} else if (replayFromStep==null && continueOnErrorHandledOrNextReplay) {
// workflow error handler indicated a next step to run
} else {
// shouldn't come here
continueOnErrorHandledOrNextReplay = false;
throw new IllegalStateException("Should either be replaying or unstarted, but not invoked as replaying, and current=" + currentStepIndex + " replay=" + replayFromStep);
}
}
private Task<?> initializeTimerFromWorkflowTimeout(Task<?> timerTask) {
Task<?> otherTask = Tasks.current();
timerTask = Entities.submit(getEntity(), Tasks.builder().displayName("Timer for " + WorkflowExecutionContext.this.toString() + ":" + taskId)
.body(() -> {
try {
Time.sleep(timeout);
if (!otherTask.isDone()) {
timerCancelled.set(true);
log.debug("Cancelling " + otherTask + " after timeout of " + timeout);
otherTask.cancel(true);
}
} catch (Throwable e) {
if (Exceptions.isRootCauseIsInterruption(e)) {
// normal, just exit
} else {
throw Exceptions.propagate(e);
}
}
}).dynamic(false)
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
.tag(BrooklynTaskTags.INESSENTIAL_TASK)
.build());
return timerTask;
}
private Object endWithSuccess() {
WorkflowReplayUtils.updateOnWorkflowSuccess(WorkflowExecutionContext.this, task, output);
persist();
return output;
}
private Object endWithError(Throwable e, WorkflowStatus provisionalStatus) {
updateStatus(provisionalStatus);
WorkflowReplayUtils.updateOnWorkflowError(WorkflowExecutionContext.this, task, e);
try {
log.debug("Error running workflow " + this + "; will persist then rethrow: " + e);
log.trace("Error running workflow " + this + "; will persist then rethrow (details): " + e, e);
} catch (Throwable e2) {
if (Entities.isUnmanagingOrNoLongerManaged(getEntity())) {
log.trace("Error persisting workflow (entity ending) " + this + " after error in workflow; persistence error (details): " + e2, e2);
} else {
log.error("Error persisting workflow " + this + " after error in workflow; persistence error: " + e2);
log.debug("Error persisting workflow " + this + " after error in workflow; persistence error (details): " + e2, e2);
log.warn("Error running workflow " + this + ", rethrowing without persisting because of persistence error (above): " + e);
}
log.trace("Error running workflow " + this + ", rethrowing without persisting because of persistence error (above): " + e, e);
}
throw Exceptions.propagate(e);
}
protected void runCurrentStepIfPreconditions() {
WorkflowStepDefinition step = getStepsResolved().get(currentStepIndex);
if (step!=null) {
currentStepInstance = new WorkflowStepInstanceExecutionContext(currentStepIndex, step, WorkflowExecutionContext.this);
DslPredicates.DslPredicate conditionResolved = step.getConditionResolved(currentStepInstance);
if (conditionResolved!=null) {
if (log.isTraceEnabled()) log.trace("Considering condition "+step.condition+" for "+ workflowStepReference(currentStepIndex));
boolean conditionMet = DslPredicates.evaluateDslPredicateWithBrooklynObjectContext(conditionResolved, WorkflowExecutionContext.this, getEntityOrAdjunctWhereRunning());
if (log.isTraceEnabled()) log.trace("Considered condition "+step.condition+" for "+ workflowStepReference(currentStepIndex)+": "+conditionMet);
if (!conditionMet) {
moveToNextStep("Skipping step "+ workflowStepReference(currentStepIndex), false);
return;
}
}
// no condition or condition met -- record and run the step
runCurrentStepInstanceApproved(step);
} else {
// moving to floor/ceiling in treemap made sense when numero-ordered IDs are used, but not with list
throw new IllegalStateException("Cannot find step "+currentStepIndex);
}
}
private void runCurrentStepInstanceApproved(WorkflowStepDefinition step) {
stepsRun++;
Task<?> t;
if (continuationInstructions!=null) {
t = step.newTaskContinuing(currentStepInstance, continuationInstructions);
continuationInstructions = null;
} else {
t = step.newTask(currentStepInstance);
}
// about to run -- checkpoint noting current and previous steps, and updating replayable from info
OldStepRecord currentStepRecord = oldStepInfo.compute(currentStepIndex, (index, old) -> {
if (old == null) old = new OldStepRecord();
old.countStarted++;
if (!workflowScratchVariables.isEmpty())
old.workflowScratch = MutableMap.copyOf(workflowScratchVariables);
else old.workflowScratch = null;
old.previous = MutableSet.<Integer>of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous);
old.previousTaskId = previousStepTaskId;
old.nextTaskId = null;
return old;
});
WorkflowReplayUtils.updateReplayableFromStep(WorkflowExecutionContext.this, step);
oldStepInfo.compute(previousStepIndex==null ? STEP_INDEX_FOR_START : previousStepIndex, (index, old) -> {
if (old==null) old = new OldStepRecord();
old.next = MutableSet.<Integer>of(currentStepIndex).putAll(old.next);
old.nextTaskId = t.getId();
return old;
});
errorHandlerContext = null;
errorHandlerTaskId = null;
currentStepInstance.next = null; // clear, eg if was set from a previous run; will be reset from step definition
// but don't clear output, in case a step is returning to itself and wants to reference previous_step.output
persist();
BiConsumer<Object,Object> onFinish = (output,overrideNext) -> {
currentStepInstance.next = WorkflowReplayUtils.getNext(overrideNext, currentStepInstance, step);
if (output!=null) currentStepInstance.output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, output, Object.class);
};
// now run the step
try {
Duration duration = step.getTimeout();
if (duration!=null) {
boolean isEnded = DynamicTasks.queue(t).blockUntilEnded(duration);
if (isEnded) {
currentStepInstance.output = t.getUnchecked();
} else {
t.cancel(true);
throw new TimeoutException("Timeout after "+duration+": "+t.getDisplayName());
}
} else {
currentStepInstance.output = DynamicTasks.queue(t).getUnchecked();
}
// allow output to be customized / overridden
onFinish.accept(step.output, null);
} catch (Exception e) {
handleErrorAtStep(step, t, onFinish, e);
}
oldStepInfo.compute(currentStepIndex, (index, old) -> {
if (old==null) {
log.warn("Lost old step info for "+this+", step "+index);
old = new OldStepRecord();
}
old.countCompleted++;
// okay if this gets picked up by accident because we will check the stepIndex it records against the currentStepIndex,
// and ignore it if different
old.context = currentStepInstance;
return old;
});
previousStepTaskId = currentStepInstance.taskId;
previousStepIndex = currentStepIndex;
moveToNextStep("Completed step "+ workflowStepReference(currentStepIndex), false);
}
private void handleErrorAtStep(WorkflowStepDefinition step, Task<?> stepTaskThrowingError, BiConsumer<Object, Object> onFinish, Exception error) {
WorkflowErrorHandling.handleErrorAtStep(getEntity(), step, currentStepInstance, stepTaskThrowingError, onFinish, error, null);
}
private void logWarnOnExceptionOrDebugIfKnown(Exception e, String msg) {
WorkflowErrorHandling.logWarnOnExceptionOrDebugIfKnown(getEntity(), e, msg);
}
private void moveToNextStep(String prefix, boolean inferredNext) {
prefix = prefix + "; ";
Object specialNext = WorkflowReplayUtils.getNext(currentStepInstance);
continuationInstructions = specialNext instanceof WorkflowStepDefinition.ReplayContinuationInstructions ? (WorkflowStepDefinition.ReplayContinuationInstructions) specialNext : null;
if (continuationInstructions!=null) {
log.debug(prefix + "proceeding to custom replay: "+continuationInstructions);
return;
}
if (specialNext==null) {
currentStepIndex++;
if (currentStepIndex < getStepsResolved().size()) {
log.debug(prefix + "moving to sequential next step " + workflowStepReference(currentStepIndex));
} else {
log.debug(prefix + "no further steps: Workflow completed");
}
} else if (specialNext instanceof String) {
String explicitNext = (String)specialNext;
Maybe<Pair<Integer, Boolean>> nextResolved = getIndexOfStepId(explicitNext);
if (nextResolved.isAbsent()) {
log.warn(prefix + (inferredNext ? "inferred" : "explicit") + " next step '"+explicitNext+"' not found (failing)");
// throw
nextResolved.get();
}
if (nextResolved.get().getLeft()==null) {
throw new IllegalArgumentException("Next step '"+explicitNext+"' not supported here");
}
currentStepIndex = nextResolved.get().getLeft();
if (nextResolved.get().getRight()) {
if (currentStepIndex < getStepsResolved().size()) {
log.debug(prefix + "moving to "+(inferredNext ? "inferred" : "explicit")+" next step " + workflowStepReference(currentStepIndex) + " for token '" + explicitNext + "'");
} else {
log.debug(prefix + (inferredNext ? "inferred" : "explicit") + " next step '"+explicitNext+"': Workflow completed");
}
} else {
log.debug(prefix + "moving to "+(inferredNext ? "inferred" : "explicit")+" next step " + workflowStepReference(currentStepIndex) + " for id '" + explicitNext + "'");
}
} else {
throw new IllegalStateException("Illegal next definition: "+specialNext+" (type "+specialNext.getClass()+")");
}
}
String workflowStepReference(Integer index) {
if (index==null) return workflowId+"-<no-step>";
if (index>=getStepsResolved().size()) return getWorkflowStepReference(index, "<END>", false);
return getWorkflowStepReference(index, getStepsResolved().get(index));
}
public Body withIntro(Runnable intro) {
this.intro = intro;
return this;
}
}
public Maybe<Pair<Integer,Boolean>> getIndexOfStepId(String next) {
if (next==null) return Maybe.absent("Null step ID supplied");
Function<WorkflowExecutionContext, Integer> predefined = PREDEFINED_NEXT_TARGETS.get(next.toLowerCase());
if (predefined!=null) return Maybe.of(Pair.of(predefined.apply(this), true));
Pair<Integer, WorkflowStepDefinition> explicit = getStepsWithExplicitIdById().get(next);
if (explicit!=null) return Maybe.of(Pair.of(explicit.getLeft(), false));
return Maybe.absent(new NoSuchElementException("Step with ID '"+next+"' not found"));
}
String getWorkflowStepReference(int index, WorkflowStepDefinition step) {
return getWorkflowStepReference(index, step!=null ? step.id : null, false);
}
String getWorkflowStepReference(int index, String optionalStepId, boolean isError) {
// error handler step number not always available here,
// and risk of ID ambiguity with error-handler phrase;
// only for logging so this is okay;
// for canonical usage, prefer the Task-based method below
return workflowId+(index>=0 ? "-"+(index+1) : (index==STEP_INDEX_FOR_ERROR_HANDLER && isError) ? "" : "-"+indexCode(index))
+(Strings.isNonBlank(optionalStepId) ? "-"+optionalStepId : "")
+(isError ? "-"+LABEL_FOR_ERROR_HANDLER : "");
}
public String getWorkflowStepReference(Task<?> t) {
BrooklynTaskTags.WorkflowTaskTag wt = BrooklynTaskTags.getWorkflowTaskTag(t, false);
if (wt.getErrorHandlerIndex()!=null) {
// formula below not suitable for error tasks, but the name should be good
return t.getDisplayName();
}
return wt.getWorkflowId()
+(wt.getStepIndex()!=null && wt.getStepIndex()>=0 && wt.getStepIndex()<stepsDefinition.size() ? "-"+(wt.getStepIndex()+1) : "")
+(wt.getErrorHandlerIndex()!=null ? "-error-handler-"+(wt.getErrorHandlerIndex()+1) : "")
;
}
private String indexCode(int index) {
// these numbers shouldn't be used for much, but they are used in a few places :(
if (index==STEP_INDEX_FOR_START) return STEP_TARGET_NAME_FOR_START;
if (index==STEP_INDEX_FOR_END) return STEP_TARGET_NAME_FOR_END;
if (index==STEP_INDEX_FOR_ERROR_HANDLER) return LABEL_FOR_ERROR_HANDLER;
return "neg-"+(index); // unknown
}
}