| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.core.workflow; |
| |
| import com.fasterxml.jackson.annotation.JsonIgnore; |
| import com.fasterxml.jackson.annotation.JsonInclude; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.fasterxml.jackson.databind.annotation.JsonDeserialize; |
| import com.google.common.collect.Iterables; |
| import com.google.common.reflect.TypeToken; |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.mgmt.ManagementContext; |
| import org.apache.brooklyn.api.mgmt.Task; |
| import org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext; |
| import org.apache.brooklyn.api.objs.BrooklynObject; |
| import org.apache.brooklyn.api.sensor.AttributeSensor; |
| import org.apache.brooklyn.config.ConfigKey; |
| import org.apache.brooklyn.core.effector.Effectors; |
| import org.apache.brooklyn.core.entity.Entities; |
| import org.apache.brooklyn.core.entity.EntityAdjuncts; |
| import org.apache.brooklyn.core.entity.EntityInternal; |
| import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; |
| import org.apache.brooklyn.core.resolve.jackson.JsonPassThroughDeserializer; |
| import org.apache.brooklyn.core.sensor.Sensors; |
| import org.apache.brooklyn.core.typereg.RegisteredTypes; |
| import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration; |
| import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors; |
| import org.apache.brooklyn.core.workflow.utils.WorkflowRetentionParser; |
| import org.apache.brooklyn.util.collections.MutableList; |
| import org.apache.brooklyn.util.collections.MutableMap; |
| import org.apache.brooklyn.util.collections.MutableSet; |
| import org.apache.brooklyn.util.core.config.ConfigBag; |
| import org.apache.brooklyn.util.core.flags.BrooklynTypeNameResolution; |
| import org.apache.brooklyn.util.core.predicates.DslPredicates; |
| import org.apache.brooklyn.util.core.task.DynamicTasks; |
| import org.apache.brooklyn.util.core.task.TaskBuilder; |
| import org.apache.brooklyn.util.core.task.TaskTags; |
| import org.apache.brooklyn.util.core.task.Tasks; |
| import org.apache.brooklyn.util.core.text.TemplateProcessor; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; |
| import org.apache.brooklyn.util.guava.Maybe; |
| import org.apache.brooklyn.util.javalang.Threads; |
| import org.apache.brooklyn.util.text.Strings; |
| import org.apache.brooklyn.util.time.Duration; |
| import org.apache.brooklyn.util.time.Time; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.Nullable; |
| import java.time.Instant; |
| import java.util.*; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.BiConsumer; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| |
| import static org.apache.brooklyn.core.workflow.WorkflowReplayUtils.ReplayResumeDepthCheck.RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT; |
| |
| @JsonInclude(JsonInclude.Include.NON_NULL) |
| public class WorkflowExecutionContext { |
| |
| private static final Logger log = LoggerFactory.getLogger(WorkflowExecutionContext.class); |
| |
| public static final String LABEL_FOR_ERROR_HANDLER = "error-handler"; |
| |
| public static final int STEP_INDEX_FOR_START = -1; |
| public static final int STEP_INDEX_FOR_END = -2; |
| public static final int STEP_INDEX_FOR_ERROR_HANDLER = -3; |
| |
| public static final String STEP_TARGET_NAME_FOR_START = "start"; |
| public static final String STEP_TARGET_NAME_FOR_END = "end"; |
| public static final String STEP_TARGET_NAME_FOR_LAST = "last"; |
| public static final String STEP_TARGET_NAME_FOR_HERE = "here"; |
| public static final String STEP_TARGET_NAME_FOR_EXIT = "exit"; |
| public static final String STEP_TARGET_NAME_FOR_DEFAULT = "default"; |
| |
| public static final Map<String, Function<WorkflowExecutionContext,Integer>> PREDEFINED_NEXT_TARGETS = MutableMap.<String, Function<WorkflowExecutionContext,Integer>>of( |
| STEP_TARGET_NAME_FOR_START, c -> c==null? STEP_INDEX_FOR_START : 0, |
| STEP_TARGET_NAME_FOR_END, c -> c==null ? STEP_INDEX_FOR_END : c.stepsDefinition.size(), |
| STEP_TARGET_NAME_FOR_LAST, c -> c==null ? null : c.replayableLastStep, |
| STEP_TARGET_NAME_FOR_HERE, c -> c==null ? null : null, |
| STEP_TARGET_NAME_FOR_EXIT, c -> c==null ? null : null, |
| STEP_TARGET_NAME_FOR_DEFAULT, c -> c==null ? null : c.currentStepIndex+1).asUnmodifiable(); |
| |
| String name; |
| @Nullable BrooklynObject adjunct; |
| Entity entity; |
| |
| public enum WorkflowStatus { |
| STAGED(false, false, false, false), |
| RUNNING(true, false, false, false), |
| SUCCESS(true, true, false, true), |
| /** useful information, usually cannot persisted by the time we've set this the first time, but could set on rebind */ ERROR_SHUTDOWN(true, true, true, false), |
| /** task failed because entity destroyed */ ERROR_ENTITY_DESTROYED(true, true, true, true), |
| /** task cancelled, timeout, or other interrupt, usually recursively (but not entity destroyed or server shutdown) */ ERROR_CANCELLED(true, true, true, true), |
| /** any other error, e.g. workflow step failed or data not immediately available (the interrupt used internally is not relevant) */ ERROR(true, true, true, true); |
| |
| public final boolean started; |
| public final boolean ended; |
| public final boolean error; |
| public final boolean expirable; |
| |
| WorkflowStatus(boolean started, boolean ended, boolean error, boolean expirable) { this.started = started; this.ended = ended; this.error = error; this.expirable = expirable; } |
| } |
| |
| WorkflowStatus status; |
| Instant lastStatusChangeTime; |
| |
| @JsonIgnore private transient WorkflowExecutionContext parent; |
| private BrooklynTaskTags.WorkflowTaskTag parentTag; |
| |
| // should be treated as raw json |
| @JsonDeserialize(contentUsing = JsonPassThroughDeserializer.class) |
| List<Object> stepsDefinition; |
| |
| DslPredicates.DslPredicate condition; |
| |
| @JsonInclude(JsonInclude.Include.NON_EMPTY) |
| Map<String,Object> input = MutableMap.of(); |
| @JsonIgnore // persist as sensor but not via REST in case it has secrets resolved |
| Map<String,Object> inputResolved = MutableMap.of(); |
| |
| Object outputDefinition; |
| Object output; |
| |
| Object lock; |
| |
| Duration timeout; |
| Object onError; |
| |
| String workflowId; |
| /** current or most recent executing task created for this workflow, corresponding to task */ |
| String taskId; |
| transient Task<Object> task; |
| |
| @JsonProperty("retention") |
| WorkflowRetentionAndExpiration.WorkflowRetentionSettings retention; |
| |
| /** all tasks created for this workflow */ |
| Set<WorkflowReplayUtils.WorkflowReplayRecord> replays = MutableSet.of(); |
| transient WorkflowReplayUtils.WorkflowReplayRecord replayCurrent = null; |
| /** null if no replay point, otherwise step number of the last completed replay point, or -1 if should replay from start and -2 if completed successfully */ |
| Integer replayableLastStep; |
| Boolean replayableFromStart; |
| Boolean replayableAutomatically; |
| Boolean replayableDisabled; |
| Boolean idempotentAll; |
| |
| Integer currentStepIndex; |
| Integer previousStepIndex; |
| String previousStepTaskId; |
| |
| WorkflowStepInstanceExecutionContext currentStepInstance; |
| |
| /** set if an error handler is the last thing which ran */ |
| String errorHandlerTaskId; |
| /** set for the last _step_ inside the error handler */ |
| WorkflowStepInstanceExecutionContext errorHandlerContext; |
| |
| Map<Integer, OldStepRecord> oldStepInfo = MutableMap.of(); |
| |
| @JsonInclude(JsonInclude.Include.NON_EMPTY) |
| public static class OldStepRecord { |
| /** count of runs started */ |
| int countStarted = 0; |
| /** count of runs completed */ |
| int countCompleted = 0; |
| |
| /** context for last _completed_ instance of step */ |
| WorkflowStepInstanceExecutionContext context; |
| /** is step replayable */ |
| Boolean replayableFromHere; |
| /** scratch for last _started_ instance of step */ |
| Map<String,Object> workflowScratch; |
| /** steps that immediately preceded this, updated when _this_ step started, with most recent first */ |
| Set<Integer> previous; |
| /** steps that immediately followed this, updated when _next_ step started, with most recent first */ |
| Set<Integer> next; |
| |
| String previousTaskId; |
| String nextTaskId; |
| } |
| |
| Map<String,Object> workflowScratchVariables = MutableMap.of(); |
| |
| @JsonInclude(JsonInclude.Include.NON_EMPTY) |
| Map<String,List<Instant>> retryRecords = MutableMap.of(); |
| |
| // deserialization constructor |
| private WorkflowExecutionContext() {} |
| |
| public static WorkflowExecutionContext newInstancePersisted(BrooklynObject entityOrAdjunctWhereRunning, WorkflowContextType wcType, String name, ConfigBag paramsDefiningWorkflow, |
| Collection<ConfigKey<?>> extraConfigKeys, ConfigBag extraInputs, Map<String, Object> optionalTaskFlags) { |
| WorkflowExecutionContext w = newInstanceUnpersistedWithParent(entityOrAdjunctWhereRunning, null, wcType, name, paramsDefiningWorkflow, extraConfigKeys, extraInputs, optionalTaskFlags); |
| w.persist(); |
| return w; |
| } |
| |
| public enum WorkflowContextType { |
| SENSOR, EFFECTOR, POLICY, NESTED_WORKFLOW, OTHER |
| } |
| |
| public static WorkflowExecutionContext newInstanceUnpersistedWithParent(BrooklynObject entityOrAdjunctWhereRunning, WorkflowExecutionContext parent, |
| WorkflowContextType wcType, String name, ConfigBag paramsDefiningWorkflow, |
| Collection<ConfigKey<?>> extraConfigKeys, ConfigBag extraInputs, Map<String, Object> optionalTaskFlags) { |
| |
| // parameter defs |
| Map<String,ConfigKey<?>> parameters = MutableMap.of(); |
| Maybe<BrooklynClassLoadingContext> loader = RegisteredTypes.getClassLoadingContextMaybe(entityOrAdjunctWhereRunning); |
| Effectors.parseParameters(paramsDefiningWorkflow.get(WorkflowCommonConfig.PARAMETER_DEFS), loader.orNull()).forEach(p -> parameters.put(p.getName(), Effectors.asConfigKey(p))); |
| if (extraConfigKeys!=null) extraConfigKeys.forEach(p -> parameters.put(p.getName(), p)); |
| |
| // inputs, unresolved first |
| ConfigBag inputRaw = ConfigBag.newInstance(); |
| inputRaw.putAll(paramsDefiningWorkflow.get(WorkflowCommonConfig.INPUT)); |
| if (extraInputs!=null) inputRaw.putAll(extraInputs.getAllConfig()); |
| parameters.values().forEach(p -> { |
| if (p.hasDefaultValue() && !inputRaw.containsKey(p.getName())) inputRaw.put((ConfigKey)p, p.getDefaultValue()); |
| }); |
| |
| MutableMap<String,Object> input = MutableMap.of(); |
| inputRaw.forEach( (k,v) -> { |
| ConfigKey<?> kc = parameters.get(k); |
| // coerce, but don't freemarker resolve inputs because that's the job of the caller (e.g. in nested workflow, inputs resolved relative to calling workflow) |
| Object v2 = kc == null ? v : inputRaw.get(kc); |
| input.put(k, v2); |
| }); |
| |
| WorkflowExecutionContext w = new WorkflowExecutionContext(entityOrAdjunctWhereRunning, parent, name, |
| paramsDefiningWorkflow.get(WorkflowCommonConfig.STEPS), |
| input, |
| paramsDefiningWorkflow.get(WorkflowCommonConfig.OUTPUT), |
| WorkflowReplayUtils.updaterForReplayableAtWorkflow(paramsDefiningWorkflow, wcType == WorkflowContextType.NESTED_WORKFLOW), |
| optionalTaskFlags); |
| |
| w.getStepsResolved(); // ensure steps resolve at this point; should be true even if condition doesn't apply (though input might not be valid without condition) |
| |
| w.retention = WorkflowRetentionParser.parse(paramsDefiningWorkflow.get(WorkflowCommonConfig.RETENTION), w).init(w); |
| w.lock = paramsDefiningWorkflow.get(WorkflowCommonConfig.LOCK); |
| w.timeout = paramsDefiningWorkflow.get(WorkflowCommonConfig.TIMEOUT); |
| w.onError = paramsDefiningWorkflow.get(WorkflowCommonConfig.ON_ERROR); |
| // fail fast if error steps not resolveable |
| WorkflowStepResolution.resolveSubSteps(w.getManagementContext(), "error handling", WorkflowErrorHandling.wrappedInListIfNecessaryOrNullIfEmpty(w.onError)); |
| |
| // some fields need to be resolved at setting time, in the context of the workflow |
| w.setCondition(w.resolveWrapped(WorkflowExpressionResolution.WorkflowExpressionStage.WORKFLOW_STARTING_POST_INPUT, paramsDefiningWorkflow.getStringKey(WorkflowCommonConfig.CONDITION.getName()), WorkflowCommonConfig.CONDITION.getTypeToken())); |
| |
| // finished -- checkpoint noting this has been created but not yet started |
| w.updateStatus(WorkflowStatus.STAGED); |
| return w; |
| } |
| |
| protected WorkflowExecutionContext(BrooklynObject entityOrAdjunctWhereRunning, WorkflowExecutionContext parent, String name, |
| List<Object> stepsDefinition, Map<String,Object> input, Object output, |
| Consumer<WorkflowExecutionContext> replayableInitializer, Map<String, Object> optionalTaskFlags) { |
| initParent(parent); |
| this.name = name; |
| this.adjunct = entityOrAdjunctWhereRunning instanceof Entity ? null : entityOrAdjunctWhereRunning; |
| this.entity = entityOrAdjunctWhereRunning instanceof Entity ? (Entity)entityOrAdjunctWhereRunning : ((EntityAdjuncts.EntityAdjunctProxyable)entityOrAdjunctWhereRunning).getEntity(); |
| this.stepsDefinition = stepsDefinition; |
| |
| this.input = input; |
| this.outputDefinition = output; |
| if (replayableInitializer!=null) replayableInitializer.accept(this); |
| |
| TaskBuilder<Object> tb = Tasks.builder().dynamic(true); |
| if (optionalTaskFlags!=null) tb.flags(optionalTaskFlags); |
| if (Strings.isBlank(tb.getDisplayName())) tb.displayName(name); |
| task = tb.body(new Body()).build(); |
| WorkflowReplayUtils.updateOnWorkflowStartOrReplay(this, task, "initial run", null); |
| workflowId = taskId = task.getId(); |
| TaskTags.addTagDynamically(task, BrooklynTaskTags.WORKFLOW_TAG); |
| TaskTags.addTagDynamically(task, BrooklynTaskTags.tagForWorkflow(this)); |
| |
| // currently workflow ID is the same as the task ID assigned initially. (but if replayed they will be different.) |
| // there is no deep reason or need for this, it is just convenient, and used for tests. |
| //this.workflowId = Identifiers.makeRandomId(8); |
| } |
| |
| public void initParent(WorkflowExecutionContext parent) { |
| this.parent = parent; |
| this.parentTag = parent==null ? null : BrooklynTaskTags.tagForWorkflow(parent); |
| } |
| |
| @JsonIgnore |
| public WorkflowExecutionContext getParent() { |
| if (parent==null && parentTag!=null) { |
| Entity entity = getManagementContext().getEntityManager().getEntity(parentTag.getEntityId()); |
| if (entity==null) { |
| log.warn("Parent workflow "+parentTag+" for "+this+" is on an entity no longer known; unparenting this workflow"); |
| parentTag = null; |
| } else { |
| parent = new WorkflowStatePersistenceViaSensors(getManagementContext()).getWorkflows(entity).get(parentTag.getWorkflowId()); |
| if (parent==null) { |
| log.warn("Parent workflow "+parentTag+" for "+this+" is no longer known; unparenting this workflow"); |
| parentTag = null; |
| } |
| } |
| } |
| return parent; |
| } |
| |
| public static void validateSteps(ManagementContext mgmt, List<WorkflowStepDefinition> steps, boolean alreadyValidatedIndividualSteps) { |
| if (!alreadyValidatedIndividualSteps) { |
| steps.forEach(w -> w.validateStep(mgmt, null)); |
| } |
| |
| computeStepsWithExplicitIdById(steps); |
| } |
| |
| static Map<String,Pair<Integer,WorkflowStepDefinition>> computeStepsWithExplicitIdById(List<WorkflowStepDefinition> steps) { |
| Map<String,Pair<Integer,WorkflowStepDefinition>> stepsWithExplicitId = MutableMap.of(); |
| for (int i = 0; i<steps.size(); i++) { |
| WorkflowStepDefinition s = steps.get(i); |
| if (s.id != null) { |
| if (PREDEFINED_NEXT_TARGETS.containsKey(s.id.toLowerCase())) |
| throw new IllegalStateException("Token '" + s + "' is a reserved word and cannot be used as a step ID"); |
| |
| Pair<Integer, WorkflowStepDefinition> old = stepsWithExplicitId.put(s.id, Pair.of(i, s)); |
| if (old != null) throw new IllegalStateException("Same step ID '" + s + "' used for multiple steps ("+(old.getLeft()+1)+" and "+(i+1)+")"); |
| } |
| } |
| return stepsWithExplicitId; |
| } |
| |
| public void setCondition(DslPredicates.DslPredicate condition) { |
| this.condition = condition; |
| } |
| |
| @Override |
| public String toString() { |
| return "Workflow<" + name + " - " + workflowId + ">"; |
| } |
| |
| @JsonIgnore |
| public BrooklynObject getEntityOrAdjunctWhereRunning() { |
| if (adjunct!=null) return adjunct; |
| return entity; |
| } |
| |
| public BrooklynTaskTags.WorkflowTaskTag getParentTag() { |
| return parentTag; |
| } |
| |
| public Map<String, Object> getWorkflowScratchVariables() { |
| return workflowScratchVariables; |
| } |
| |
| public Map<String, List<Instant>> getRetryRecords() { |
| return retryRecords; |
| } |
| |
| @JsonIgnore |
| public Object getConditionTarget() { |
| if (getWorkflowScratchVariables()!=null) { |
| Object v = getWorkflowScratchVariables().get("target"); |
| // should we also set the entity? otherwise it will take from the task. but that should only apply |
| // in a task where the context entity is set, so for now rely on that. |
| if (v!=null) return v; |
| } |
| return getEntityOrAdjunctWhereRunning(); |
| } |
| |
| @JsonIgnore |
| public Maybe<Task<Object>> getTask(boolean checkCondition) { |
| if (checkCondition && condition!=null) { |
| if (!condition.apply(getConditionTarget())) return Maybe.absent(new IllegalStateException("This workflow cannot be run at present: condition not satisfied")); |
| } |
| |
| if (task==null) { |
| if (taskId !=null) { |
| task = (Task<Object>) getManagementContext().getExecutionManager().getTask(taskId); |
| } |
| if (task==null) { |
| return Maybe.absent(new IllegalStateException("Task for "+this+" no longer available")); |
| } |
| } |
| return Maybe.of(task); |
| } |
| |
| public Factory factory(boolean allowInternallyEvenIfDisabled) { |
| return new Factory(allowInternallyEvenIfDisabled); |
| } |
| |
| public class Factory { |
| private final boolean allowInternallyEvenIfDisabled; |
| |
| protected Factory(boolean allowInternallyEvenIfDisabled) { |
| this.allowInternallyEvenIfDisabled = allowInternallyEvenIfDisabled; |
| } |
| |
| public WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayingFromStep(int stepIndex0, String reason, boolean forced) { |
| if (!forced) checkNotDisabled(); |
| int stepIndex = stepIndex0; |
| if (!forced) { |
| stepIndex = Maybe.ofDisallowingNull(WorkflowReplayUtils.findNearestReplayPoint(WorkflowExecutionContext.this, stepIndex0)) |
| .orThrow(() -> new IllegalStateException("Workflow is not replayable: no replay points found backtracking from " + stepIndex0)); |
| log.debug("Request to replay from step " + stepIndex0 + ", nearest replay point is " + stepIndex); |
| } |
| return new WorkflowStepDefinition.ReplayContinuationInstructions(stepIndex, reason, null, forced); |
| } |
| |
| public WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayingFromLastReplayable(String reason, boolean forced) { |
| return makeInstructionsForReplayingFromStep(replayableLastStep != null ? replayableLastStep : STEP_INDEX_FOR_START, reason, forced); |
| } |
| |
| public WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayingFromStart(String reason, boolean forced) { |
| return makeInstructionsForReplayingFromStep(STEP_INDEX_FOR_START, reason, forced); |
| } |
| |
| public WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayResuming(String reason, boolean forced) { |
| return makeInstructionsForReplayResuming(reason, forced, null); |
| } |
| |
| public WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayResumingForcedWithCustom(String reason, Runnable code) { |
| return makeInstructionsForReplayResuming(reason, true, code); |
| } |
| |
| protected WorkflowStepDefinition.ReplayContinuationInstructions makeInstructionsForReplayResuming(String reason, boolean forced, Runnable code) { |
| if (!forced) checkNotDisabled(); |
| Integer replayFromStep = null; |
| if (currentStepIndex == null) { |
| // not yet started |
| replayFromStep = STEP_INDEX_FOR_START; |
| } else if (currentStepInstance == null || currentStepInstance.stepIndex != currentStepIndex) { |
| // replaying from a different step, or current step which has either not run or completed but didn't save |
| log.debug("Replaying workflow '" + name + "', cannot replay within step " + currentStepIndex + " because step instance not known; will reinitialize then replay that step"); |
| replayFromStep = currentStepIndex; |
| } |
| |
| if (!forced && replayFromStep == null) { |
| // instructions should be made even if subworkflows might reject them; that's the intention of "resume" without force, vs replay from last |
| if (!WorkflowReplayUtils.isReplayResumable(WorkflowExecutionContext.this, RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT, allowInternallyEvenIfDisabled)) { |
| if (code != null) { |
| // we could allow this, but we don't need it |
| throw new IllegalArgumentException("Cannot supply code to here without forcing as workflow does not support replay resuming at this point"); |
| } |
| log.debug("Request to replay resuming " + WorkflowExecutionContext.this + " at non-idempotent step; rolling back to " + replayableLastStep); |
| if (replayableLastStep == null) { |
| throw new IllegalArgumentException("Cannot replay resuming as there are no replay points and last step " + currentStepIndex + " is not idempotent"); |
| } |
| return makeInstructionsForReplayingFromStep(replayableLastStep, reason, false); |
| } |
| } |
| |
| return new WorkflowStepDefinition.ReplayContinuationInstructions(replayFromStep, reason, code, forced); |
| } |
| |
| public Task<Object> createTaskReplaying(WorkflowStepDefinition.ReplayContinuationInstructions continuationInstructions) { |
| return createTaskReplaying(null, continuationInstructions); |
| } |
| |
| public Task<Object> createTaskReplaying(Runnable intro, WorkflowStepDefinition.ReplayContinuationInstructions continuationInstructions) { |
| if (continuationInstructions==null || !continuationInstructions.forced) checkNotDisabled(); |
| if (task != null && !task.isDone()) { |
| if (!task.isSubmitted()) { |
| if (parent!=null && parent.getReplays().size()>1) { |
| log.debug("Abandoning sub-workflow task that was never submitted, not unusual as parent seems to be replaying: " + task + " for " + WorkflowExecutionContext.this); |
| } else { |
| log.warn("Abandoning workflow task that was never submitted: " + task + " for " + WorkflowExecutionContext.this); |
| } |
| } else { |
| if (isSubmitterAncestor(Tasks.current(), task)) { |
| // not sure we need this check |
| log.debug("Replaying containing workflow " + WorkflowExecutionContext.this + " in task " + task + " which is an ancestor of " + Tasks.current()); |
| } else { |
| log.warn("Unable to replay workflow " + WorkflowExecutionContext.this + " from " + Tasks.current() + " because workflow task " + task + " is ongoing; will delay up to 1s then retry"); |
| // there can be a slight race between tasks ending and the workflow reporting a failure and replaying; |
| // esp in tests, but also in real world, forgive such a situation by delaying the replay for a short time |
| if (!task.blockUntilEnded(Duration.ONE_SECOND)) { |
| log.warn("Unable to replay workflow " + WorkflowExecutionContext.this + " from " + Tasks.current() + " because workflow task " + task + " is ongoing (waited 1s, still ongoing; so rethrowing)"); |
| throw new IllegalStateException("Cannot replay ongoing workflow, given " + continuationInstructions); |
| } |
| } |
| } |
| } |
| |
| String explanation = continuationInstructions.customBehaviorExplanation != null ? continuationInstructions.customBehaviorExplanation : "no explanation"; |
| task = Tasks.builder().dynamic(true).displayName(name + " (" + explanation + ")") |
| .tag(BrooklynTaskTags.tagForWorkflow(WorkflowExecutionContext.this)) |
| .tag(BrooklynTaskTags.WORKFLOW_TAG) |
| .body(new Body(continuationInstructions).withIntro(intro)).build(); |
| WorkflowReplayUtils.updateOnWorkflowStartOrReplay(WorkflowExecutionContext.this, task, continuationInstructions.customBehaviorExplanation, continuationInstructions.stepToReplayFrom); |
| |
| taskId = task.getId(); |
| |
| return task; |
| } |
| |
| public boolean isDisabled() { |
| if (allowInternallyEvenIfDisabled) return false; |
| if (Boolean.TRUE.equals(replayableDisabled)) return true; |
| return false; |
| } |
| |
| public void checkNotDisabled() { |
| if (isDisabled()) throw new IllegalStateException("Replays disabled on "+WorkflowExecutionContext.this); |
| } |
| } |
| |
| private boolean isSubmitterAncestor(Task current, Task<Object> possibleAncestor) { |
| if (current==null) return false; |
| if (current.equals(possibleAncestor)) return true; |
| return isSubmitterAncestor(current.getSubmittedByTask(), possibleAncestor); |
| } |
| |
| public Entity getEntity() { |
| return entity; |
| } |
| |
| @JsonIgnore |
| public ManagementContext getManagementContext() { |
| return ((EntityInternal)getEntity()).getManagementContext(); |
| } |
| |
| @JsonIgnore |
| protected WorkflowStatePersistenceViaSensors getPersister() { |
| return new WorkflowStatePersistenceViaSensors(getManagementContext()); |
| } |
| |
| public void persist() { |
| if (isInErrorHandlerSubWorkflow()) { |
| // currently don't persist error handler sub-workflows |
| return; |
| } |
| getPersister().checkpoint(this); |
| } |
| |
| /** Get the value of the input. Supports Brooklyn DSL resolution but NOT Freemarker resolution. */ |
| public Object getInput(String key) { |
| return getInputMaybe(key, TypeToken.of(Object.class), Maybe.ofAllowingNull(null)).get(); |
| } |
| public <T> Maybe<T> getInputMaybe(String key, TypeToken<T> type, Maybe<T> valueIfUndefined) { |
| if (!input.containsKey(key)) return valueIfUndefined; |
| |
| if (inputResolved.containsKey(key)) return Maybe.ofAllowingNull((T)inputResolved.get(key)); |
| |
| Object v = input.get(key); |
| // normally do DSL resolution/coercion only, not workflow syntax here (as no workflow scope); |
| // except if we are in a nested workflow, we allow resolving from the parent. |
| // (alternatively we could resolve when starting the custom workflow; that might be better.) |
| Maybe<T> vm = null; |
| if (v instanceof String && parent!=null && parent.getCurrentStepInstance()!=null) { |
| try { |
| vm = Maybe.of(parent.getCurrentStepInstance().resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, (String) v, type)); |
| } catch (Exception e) { |
| Exceptions.propagateIfFatal(e); |
| vm = Maybe.absent(e); |
| } |
| } |
| if (vm==null || vm.isAbsent()) { |
| Maybe<T> vm2 = Tasks.resolving(v).as(type).context(getEntity()).immediately(true).deep().getMaybe(); |
| if (vm2.isPresent() || vm==null) vm = vm2; // if errors in both, prefer error in first |
| } |
| if (vm.isPresent()) { |
| if (WorkflowStepInstanceExecutionContext.REMEMBER_RESOLVED_INPUT) { |
| // this will keep spending time resolving, but will resolve the resolved value |
| inputResolved.put(key, vm.get()); |
| } |
| } |
| return vm; |
| } |
| |
| public TypeToken<?> lookupType(String typeName, Supplier<TypeToken<?>> ifUnset) { |
| if (Strings.isBlank(typeName)) return ifUnset.get(); |
| BrooklynClassLoadingContext loader = getEntity() != null ? RegisteredTypes.getClassLoadingContext(getEntity()) : null; |
| return new BrooklynTypeNameResolution.BrooklynTypeNameResolver("workflow", loader, true, true).getTypeToken(typeName); |
| } |
| |
| /** as {@link #resolve(WorkflowExpressionResolution.WorkflowExpressionStage, Object, TypeToken)} but without type coercion */ |
| public Object resolve(WorkflowExpressionResolution.WorkflowExpressionStage stage, String expression) { |
| return resolve(stage, expression, Object.class); |
| } |
| |
| /** as {@link #resolve(WorkflowExpressionResolution.WorkflowExpressionStage, Object, TypeToken)} */ |
| public <T> T resolve(WorkflowExpressionResolution.WorkflowExpressionStage stage, Object expression, Class<T> type) { |
| return resolve(stage, expression, TypeToken.of(type)); |
| } |
| |
| /** resolution of ${interpolation} and $brooklyn:dsl and deferred suppliers, followed by type coercion. |
| * if the type is a string, null is not permitted, otherwise it is. */ |
| public <T> T resolve(WorkflowExpressionResolution.WorkflowExpressionStage stage, Object expression, TypeToken<T> type) { |
| return new WorkflowExpressionResolution(this, stage, false, false).resolveWithTemplates(expression, type); |
| } |
| |
| public <T> T resolveCoercingOnly(WorkflowExpressionResolution.WorkflowExpressionStage stage, Object expression, TypeToken<T> type) { |
| return new WorkflowExpressionResolution(this, stage, false, false).resolveCoercingOnly(expression, type); |
| } |
| |
| /** as {@link #resolve(WorkflowExpressionResolution.WorkflowExpressionStage, Object, TypeToken)}, but returning DSL/supplier for values (so the indication of their dynamic nature is preserved, even if the value returned by it is resolved; |
| * this is needed e.g. for conditions which treat dynamic expressions differently to explicit values) */ |
| public <T> T resolveWrapped(WorkflowExpressionResolution.WorkflowExpressionStage stage, Object expression, TypeToken<T> type) { |
| return new WorkflowExpressionResolution(this, stage, false, true).resolveWithTemplates(expression, type); |
| } |
| |
| /** as {@link #resolve(WorkflowExpressionResolution.WorkflowExpressionStage, Object, TypeToken)}, but waiting on any expressions which aren't ready */ |
| public <T> T resolveWaiting(WorkflowExpressionResolution.WorkflowExpressionStage stage, Object expression, TypeToken<T> type) { |
| return new WorkflowExpressionResolution(this, stage, true, false).resolveWithTemplates(expression, type); |
| } |
| |
| /** resolution of ${interpolation} and $brooklyn:dsl and deferred suppliers, followed by type coercion */ |
| public <T> T resolveConfig(WorkflowExpressionResolution.WorkflowExpressionStage stage, ConfigBag config, ConfigKey<T> key) { |
| Object v = config.getStringKey(key.getName()); |
| if (v==null) return null; |
| return resolve(stage, v, key.getTypeToken()); |
| } |
| |
| public WorkflowStatus getStatus() { |
| return status; |
| } |
| |
| void updateStatus(WorkflowStatus newStatus) { |
| status = newStatus; |
| lastStatusChangeTime = Instant.now(); |
| } |
| |
| @JsonIgnore |
| public Instant getLastStatusChangeTime() { |
| return lastStatusChangeTime; |
| } |
| |
| public Integer getCurrentStepIndex() { |
| return currentStepIndex; |
| } |
| |
| public WorkflowStepInstanceExecutionContext getCurrentStepInstance() { |
| return currentStepInstance; |
| } |
| |
| public Integer getPreviousStepIndex() { |
| return previousStepIndex; |
| } |
| |
| // clear this when error handler is not running |
| transient Object lastErrorHandlerOutput = null; |
| |
| @JsonIgnore |
| public Object getPreviousStepOutput() { |
| if (lastErrorHandlerOutput!=null) return lastErrorHandlerOutput; |
| if (previousStepIndex==null) return null; |
| |
| OldStepRecord last = oldStepInfo.get(previousStepIndex); |
| if (last!=null && last.context!=null) return last.context.output; |
| return null; |
| } |
| |
| public Object getOutput() { |
| return output; |
| } |
| |
| public String getName() { |
| return name; |
| } |
| |
| public String getWorkflowId() { |
| return workflowId; |
| } |
| |
| public String getTaskId() { |
| return taskId; |
| } |
| |
| public Set<WorkflowReplayUtils.WorkflowReplayRecord> getReplays() { |
| return replays; |
| } |
| |
| public Integer getReplayableLastStep() { |
| return replayableLastStep; |
| } |
| |
| public WorkflowStepInstanceExecutionContext getErrorHandlerContext() { |
| return errorHandlerContext; |
| } |
| |
| @JsonIgnore |
| public String getRetentionHash() { |
| if (retention!=null && Strings.isNonBlank(retention.hash)) return retention.hash; |
| if (Strings.isNonBlank(getName())) return getName(); |
| return "anonymous-workflow-"+Math.abs(getStepsDefinition().hashCode()); |
| } |
| |
| public void updateRetentionFrom(WorkflowRetentionAndExpiration.WorkflowRetentionSettings other) { |
| WorkflowRetentionAndExpiration.WorkflowRetentionSettings r = getRetentionSettings(); |
| r.updateFrom(other); |
| retention = r; |
| retentionDefault = null; |
| } |
| |
| @JsonIgnore private transient WorkflowRetentionAndExpiration.WorkflowRetentionSettings retentionDefault; |
| @JsonIgnore |
| public WorkflowRetentionAndExpiration.WorkflowRetentionSettings getRetentionSettings() { |
| if (retention==null) { |
| if (retentionDefault==null) { |
| retentionDefault = new WorkflowRetentionAndExpiration.WorkflowRetentionSettings().init(this); |
| } |
| return retentionDefault; |
| } |
| return retention; |
| } |
| |
| public void markShutdown() { |
| log.debug(this+" was "+this.status+" but now marking as "+WorkflowStatus.ERROR_SHUTDOWN+"; compensating workflow should be triggered shortly"); |
| this.updateStatus(WorkflowStatus.ERROR_SHUTDOWN); |
| // don't persist; that will happen when workflows are kicked off |
| } |
| |
| @JsonIgnore |
| /** Error handlers _could_ launch sub-workflow, but they typically don't */ |
| protected boolean isInErrorHandlerSubWorkflow() { |
| if (getParent()!=null) { |
| if (getParent().getErrorHandlerContext()!=null) { |
| return true; |
| } |
| return getParent().isInErrorHandlerSubWorkflow(); |
| } |
| return false; |
| } |
| |
| /** look in tasks, steps, and replays to find most recent activity */ |
| // keep on jackson serialization for api? |
| public long getMostRecentActivityTime() { |
| AtomicLong result = new AtomicLong(-1); |
| |
| Consumer<Long> consider = l -> { |
| if (l!=null && l>result.get()) result.set(l); |
| }; |
| Consumer<Task> considerTask = task -> { |
| if (task!=null) { |
| consider.accept(task.getEndTimeUtc()); |
| consider.accept(task.getStartTimeUtc()); |
| consider.accept(task.getSubmitTimeUtc()); |
| } |
| }; |
| considerTask.accept(getTask(false).orNull()); |
| |
| Consumer<WorkflowReplayUtils.WorkflowReplayRecord> considerReplay = replay -> { |
| if (replay!=null) { |
| consider.accept(replay.endTimeUtc); |
| consider.accept(replay.startTimeUtc); |
| consider.accept(replay.submitTimeUtc); |
| } |
| }; |
| if (replayCurrent!=null) { |
| considerReplay.accept(replayCurrent); |
| } else if (!replays.isEmpty()) { |
| considerReplay.accept(Iterables.getLast(replays)); |
| } |
| |
| if (currentStepInstance!=null) { |
| considerTask.accept(getManagementContext().getExecutionManager().getTask(currentStepInstance.getTaskId())); |
| } |
| |
| return result.get(); |
| } |
| |
| public List<Object> getStepsDefinition() { |
| return MutableList.copyOf(stepsDefinition).asUnmodifiable(); |
| } |
| |
| transient Map<String,Pair<Integer,WorkflowStepDefinition>> stepsWithExplicitId; |
| transient List<WorkflowStepDefinition> stepsResolved; |
| @JsonIgnore |
| List<WorkflowStepDefinition> getStepsResolved() { |
| if (stepsResolved ==null) { |
| stepsResolved = MutableList.copyOf(WorkflowStepResolution.resolveSteps(getManagementContext(), WorkflowExecutionContext.this.stepsDefinition)); |
| } |
| return stepsResolved; |
| } |
| |
| @JsonIgnore |
| public Map<String, Pair<Integer,WorkflowStepDefinition>> getStepsWithExplicitIdById() { |
| if (stepsWithExplicitId ==null) stepsWithExplicitId = computeStepsWithExplicitIdById(getStepsResolved()); |
| return stepsWithExplicitId; |
| } |
| |
| |
| protected class Body implements Callable<Object> { |
| private WorkflowStepDefinition.ReplayContinuationInstructions continuationInstructions; |
| private Runnable intro = null; |
| private int stepsRun = 0; |
| |
| public Body() {} |
| |
| public Body(WorkflowStepDefinition.ReplayContinuationInstructions continuationInstructions) { |
| this.continuationInstructions = continuationInstructions; |
| } |
| |
| @Override |
| public String toString() { |
| return "WorkflowExecutionContext.Body["+workflowId+"; " + continuationInstructions + "]"; |
| } |
| |
| @Override |
| public Object call() throws Exception { |
| if (intro!=null) { |
| // intro needed to make dangling resumption wait |
| intro.run(); |
| } |
| return callWithLock(this::callSteps); |
| } |
| |
| protected Object callWithLock(Callable<Callable<Object>> handler) throws Exception { |
| AttributeSensor<String> lockSensor0 = null; |
| Entity lockEntity0 = null; |
| if (lock != null) { |
| String lockName = null; |
| if (lock instanceof String) lockName = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.WORKFLOW_INPUT, lock, TypeToken.of(String.class)); |
| else if (lock instanceof Map) { |
| lockName = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.WORKFLOW_INPUT, ((Map)lock).get("name"), TypeToken.of(String.class)); |
| |
| Object lockEntity00 = ((Map)lock).get("entity"); |
| if (lockEntity00!=null) { |
| lockEntity0 = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.WORKFLOW_INPUT, lockEntity00, TypeToken.of(Entity.class)); |
| log.debug(WorkflowExecutionContext.this + " using lock " + lockName + " on entity " + lockEntity0); |
| } |
| } |
| if (lockName==null) throw new IllegalArgumentException("Invalid lock object, should be a string or a map indicating a name and optional entity"); |
| lockSensor0 = Sensors.newStringSensor("lock-for-" + lockName); |
| if (lockEntity0==null) lockEntity0 = getEntity(); |
| } |
| AttributeSensor<String> lockSensor = lockSensor0; |
| Entity lockEntity = lockEntity0; |
| |
| AtomicBoolean mustClearLock = new AtomicBoolean(false); |
| if (lockSensor!=null) { |
| // acquire lock |
| // - step: set-sensor lock-for-count = ${workflow.id} |
| // require: |
| // any: |
| // - when: absent |
| // - equals: ${workflow.id} |
| // on-error: |
| // - retry backoff 50ms increasing 2x up to 5s |
| String wid = getWorkflowId(); |
| Duration delay = null; |
| String lastHolder = null; |
| while (true) { |
| AtomicReference<String> holder = new AtomicReference<>(); |
| lockEntity.sensors().modify(lockSensor, old -> { |
| if (old == null || old.equals(wid)) { |
| if (old != null) { |
| if (continuationInstructions != null) { |
| log.debug(WorkflowExecutionContext.this+" reasserting lock on " + lockSensor.getName() + " during replay"); |
| } else { |
| // i don't think this should be possible |
| log.warn("Entering block with lock on " + lockSensor.getName() + " when this workflow already holds the lock"); |
| } |
| } else { |
| log.debug(WorkflowExecutionContext.this+" acquired lock on " + lockSensor.getName()); |
| } |
| mustClearLock.set(true); |
| return Maybe.of(wid); |
| } |
| log.debug("Blocked by lock on " + lockSensor.getName() + ", currently held by " + old); |
| holder.set(old); |
| return Maybe.absent(); |
| }); |
| if (mustClearLock.get()) break; |
| // didn't get lock; probably do a retry |
| try { |
| if (delay==null || !Objects.equals(lastHolder, holder.get())) { |
| // reset initially, and if we observe the lock holder to change (highly competitive environment) |
| delay = Duration.millis(5); |
| } |
| Duration ddelay = delay; |
| Tasks.withBlockingDetails("Waiting for lock on " + lockSensor.getName() +" (held by workflow "+holder.get()+")", () -> { Time.sleep(ddelay); return null; }); |
| // increment delay for next time |
| delay = Duration.max(delay.multiply(1 + Math.random()), Duration.seconds(5)); |
| } catch (Exception e) { |
| throw Exceptions.propagate(e); |
| } |
| } |
| } |
| |
| Callable<Object> endHandler; |
| try { |
| endHandler = handler.call(); //super.doTaskBodyPossiblyWithLock(context, handler, isReplaying, continuationInstructions); |
| |
| } finally { |
| // clear if we set it, whether initially, on real replay, or on injected dangling failure |
| if (mustClearLock.get()) { |
| try { |
| DynamicTasks.waitForLast(); |
| } finally { |
| if (Entities.isUnmanagingOrNoLongerManaged(getEntity())) { |
| log.debug("Skipping clearance of lock on "+lockSensor.getName()+" in "+WorkflowExecutionContext.this+" because entity unmanaging here; expect auto-replay on resumption to pick up"); |
| } else { |
| Threads.runTemporarilyUninterrupted(() -> { |
| log.debug(WorkflowExecutionContext.this + " releasing lock on " + lockSensor.getName()); |
| ((EntityInternal.SensorSupportInternal) lockEntity.sensors()).remove(lockSensor); |
| }); |
| } |
| } |
| } |
| } |
| return endHandler.call(); |
| } |
| |
| boolean continueOnErrorHandledOrNextReplay; |
| AtomicReference<Boolean> timerCancelled; |
| |
| public Callable<Object> callSteps() throws Exception { |
| DynamicTasks.swallowChildrenFailures(); |
| |
| Task<?> timerTask = null; |
| timerCancelled = new AtomicReference<>(false); |
| try { |
| if (timeout != null) { |
| timerTask = initializeTimerFromWorkflowTimeout(timerTask); |
| } |
| |
| RecoveryAndReplay: do { |
| try { |
| boolean replaying = continuationInstructions!=null; |
| Integer replayFromStep = replaying ? continuationInstructions.stepToReplayFrom : null; |
| |
| if (!replaying) initializeWithoutContinuationInstructions(replayFromStep); |
| |
| continueOnErrorHandledOrNextReplay = false; |
| lastErrorHandlerOutput = null; |
| |
| WorkflowReplayUtils.updateOnWorkflowTaskStartupOrReplay(WorkflowExecutionContext.this, task, getStepsResolved(), !replaying, replayFromStep); |
| |
| // show task running |
| updateStatus(WorkflowStatus.RUNNING); |
| |
| if (replaying) initializeFromContinuationInstructions(replayFromStep); |
| |
| if (!Objects.equals(taskId, Tasks.current().getId())) |
| throw new IllegalStateException("Running workflow in unexpected task, " + taskId + " does not match " + task); |
| |
| int stepsConsidered = 0; |
| while (currentStepIndex >= 0 && currentStepIndex < getStepsResolved().size()) { |
| stepsConsidered++; |
| if (replaying && replayFromStep == null) { |
| // check step number and clear output before re-running |
| if (currentStepInstance==null || currentStepInstance.getStepIndex()!=currentStepIndex) { |
| throw new IllegalStateException("Running workflow at unexpected step, "+currentStepIndex+" v "+currentStepInstance); |
| } |
| currentStepInstance.output = null; |
| currentStepInstance.injectContext(WorkflowExecutionContext.this); |
| |
| log.debug("Replaying workflow '" + name + "', reusing instance " + currentStepInstance + " for step " + workflowStepReference(currentStepIndex) + ")"); |
| runCurrentStepInstanceApproved(getStepsResolved().get(currentStepIndex)); |
| |
| } else { |
| runCurrentStepIfPreconditions(); |
| } |
| |
| replaying = false; |
| |
| if (continuationInstructions!=null) { |
| continueOnErrorHandledOrNextReplay = true; |
| continue RecoveryAndReplay; |
| } |
| } |
| |
| log.debug("Completed workflow "+workflowId+" successfully; step count: "+stepsConsidered+" considered, "+stepsRun+" executed"); |
| |
| if (outputDefinition != null) { |
| output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, outputDefinition, Object.class); |
| } else { |
| if (stepsRun>0) { |
| // (default is the output of the last step, if there is one, otherwise nothing) |
| // ((unlike steps, workflow output is not available as a default value, but previous step always is, so there is no need to do this |
| // before the above; slight chance if onError is triggered by a failure to resolve something in outputDefinition, but that can be ignored)) |
| output = getPreviousStepOutput(); |
| } |
| } |
| |
| updateOnSuccessfulCompletion(); |
| |
| } catch (Throwable e) { |
| try { |
| Pair<Throwable, WorkflowStatus> unhandledError = handleErrorAtWorkflow(e); |
| |
| if (unhandledError != null) { |
| return () -> endWithError(unhandledError.getLeft(), unhandledError.getRight()); |
| } |
| } catch (Throwable e2) { |
| // do not propagateIfFatal, we need to handle most throwables |
| log.debug("Uncaught error in workflow exception handler: "+ e2, e2); |
| return () -> endWithError(e2, WorkflowStatus.ERROR); |
| } |
| } |
| |
| } while (continueOnErrorHandledOrNextReplay); |
| |
| } finally { |
| if (timerTask != null && !timerTask.isDone() && !timerCancelled.get()) { |
| log.debug("Cancelling " + timerTask + " on completion of this task"); |
| timerTask.cancel(true); |
| } |
| } |
| |
| return this::endWithSuccess; |
| } |
| |
| private Pair<Throwable, WorkflowStatus> handleErrorAtWorkflow(Throwable e) { |
| boolean isTimeout = false; |
| |
| if (timerCancelled.get()) { |
| if (Exceptions.getCausalChain(e).stream().anyMatch(cause -> cause instanceof TimeoutException || cause instanceof InterruptedException || cause instanceof CancellationException || cause instanceof RuntimeInterruptedException)) { |
| // timed out, and a cause is related to cancellation |
| |
| // NOTE: this just gets used for logging, it is not ever stored by the calling task or returned to a user; |
| // when the task is cancelled, the only information available to callers is that it was cancelled |
| // (we could add indications of why but that is a lot of work, and not clear we need it) |
| TimeoutException timeoutException = new TimeoutException("Timeout after " + timeout + ": " + getName()); |
| timeoutException.initCause(e); |
| e = timeoutException; |
| isTimeout = true; |
| } |
| } |
| |
| WorkflowStatus provisionalStatus; |
| |
| if (Exceptions.isCausedByInterruptInAnyThread(e) || Exceptions.getFirstThrowableMatching(e, |
| // could do with a more precise check that workflow is cancelled or has timed out; exceptions could come from other unrelated causes (but unlikely) |
| t -> t instanceof CancellationException || t instanceof TimeoutException) != null) { |
| if (!Thread.currentThread().isInterrupted()) { |
| // might be a data model error |
| if (Exceptions.getFirstThrowableOfType(e, TemplateProcessor.TemplateModelDataUnavailableException.class) != null) { |
| provisionalStatus = WorkflowStatus.ERROR; |
| } else { |
| // cancelled or a subtask interrupted |
| provisionalStatus = WorkflowStatus.ERROR_CANCELLED; |
| } |
| } else { |
| provisionalStatus = WorkflowStatus.ERROR_CANCELLED; |
| } |
| if (provisionalStatus == WorkflowStatus.ERROR_CANCELLED) { |
| if (!getManagementContext().isRunning()) { |
| // if mgmt is shutting down we should record that. maybe enough if thread is interrupted we note that. |
| provisionalStatus = WorkflowStatus.ERROR_SHUTDOWN; |
| } else if (Entities.isUnmanagingOrNoLongerManaged(entity)) { |
| provisionalStatus = WorkflowStatus.ERROR_ENTITY_DESTROYED; |
| } |
| } |
| } else { |
| provisionalStatus = WorkflowStatus.ERROR; |
| } |
| |
| boolean errorHandled = false; |
| if (isTimeout) { |
| // don't run error handler |
| log.debug("Timeout in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", throwing: " + Exceptions.collapseText(e)); |
| |
| } else if (Thread.currentThread().isInterrupted()) { |
| // don't run error handler |
| log.debug("Interrupt in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", throwing: " + Exceptions.collapseText(e)); |
| |
| } else if (onError != null && (!(onError instanceof Collection) || !((Collection)onError).isEmpty())) { |
| try { |
| log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", running error handler"); |
| Task<WorkflowErrorHandling.WorkflowErrorHandlingResult> workflowErrorHandlerTask = WorkflowErrorHandling.createWorkflowErrorHandlerTask(WorkflowExecutionContext.this, task, e); |
| errorHandlerTaskId = workflowErrorHandlerTask.getId(); |
| WorkflowErrorHandling.WorkflowErrorHandlingResult result = DynamicTasks.queue(workflowErrorHandlerTask).getUnchecked(); |
| if (result != null) { |
| errorHandled = true; |
| |
| currentStepInstance.next = WorkflowReplayUtils.getNext(result.next, STEP_TARGET_NAME_FOR_END); |
| if (result.output != null) output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, result.output, Object.class); |
| |
| moveToNextStep("Handled error in workflow around step " + workflowStepReference(currentStepIndex), result.next==null); |
| |
| if (continuationInstructions!=null || currentStepIndex < getStepsResolved().size()) { |
| continueOnErrorHandledOrNextReplay = true; |
| return null; |
| } |
| } // else errorHandled remains false and will fail below |
| |
| } catch (Exception e2) { |
| Throwable e0 = e; |
| if (Exceptions.getCausalChain(e2).stream().anyMatch(e3 -> e3==e0)) { |
| // wraps/rethrows original, don't need to log, but do return the new one |
| e = e2; |
| // } else if (Exceptions.isCausedByInterruptInAnyThread(e) && Exceptions.isCausedByInterruptInAnyThread(e2)) { |
| // // now handled above |
| // // if both are interrupted we can drop the trace, and return original; |
| // log.debug("Error where error handler was interrupted, after main thread was also interrupted: " + e2); |
| // log.trace("Full trace of original error was: " + e, e); |
| } else { |
| log.warn("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " error handler for -- " + Exceptions.collapseText(e) + " -- threw another error (rethrowing): " + Exceptions.collapseText(e2)); |
| log.debug("Full trace of original error was: " + e, e); |
| e = e2; |
| } |
| } |
| |
| } else { |
| log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", no error handler so rethrowing: " + Exceptions.collapseText(e)); |
| } |
| |
| if (errorHandled) return null; |
| |
| if (replayAutomaticallyIfAppropriate(e)) return null; |
| |
| return Pair.of(e, provisionalStatus); |
| } |
| |
| private boolean replayAutomaticallyIfAppropriate(Throwable e) { |
| if (Boolean.TRUE.equals(replayableAutomatically) && Exceptions.getFirstThrowableOfType(e, DanglingWorkflowException.class)!=null) { |
| log.info("Automatic replay indicated for "+WorkflowExecutionContext.this+" when detected as dangling on server startup"); |
| |
| currentStepInstance.next = factory(true).makeInstructionsForReplayResuming("Replay resuming on dangling", false); |
| continueOnErrorHandledOrNextReplay = true; |
| return true; |
| } |
| |
| return false; |
| } |
| |
| private void updateOnSuccessfulCompletion() { |
| // finished -- checkpoint noting previous step and null for current because finished |
| updateStatus(WorkflowStatus.SUCCESS); |
| replayableLastStep = STEP_INDEX_FOR_END; |
| // record how it ended |
| oldStepInfo.compute(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex, (index, old) -> { |
| if (old == null) old = new OldStepRecord(); |
| old.next = MutableSet.<Integer>of(STEP_INDEX_FOR_END).putAll(old.next); |
| old.nextTaskId = null; |
| return old; |
| }); |
| oldStepInfo.compute(STEP_INDEX_FOR_END, (index, old) -> { |
| if (old == null) old = new OldStepRecord(); |
| old.previous = MutableSet.<Integer>of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous); |
| old.previousTaskId = previousStepTaskId; |
| return old; |
| }); |
| } |
| |
| private void resetWorkflowContextPreviousAndScratchVarsToStep(Integer step, boolean requireLastStep) { |
| if (step==null) { |
| // when resuming, keep them as they were set |
| return; |
| } |
| OldStepRecord last = oldStepInfo.get(step); |
| if (last != null) { |
| workflowScratchVariables = last.workflowScratch; |
| previousStepIndex = last.previous==null ? null : last.previous.stream().findFirst().orElse(null); |
| |
| } else { |
| if (requireLastStep) { |
| throw new IllegalStateException("Last step record required for step "+step+" to replay from there"); |
| } else { |
| // no such step; probably starting something which has never been run (not uncommon), or at a step which was never run (not sure if possible); |
| // in any case just keep the scratch vars as they were |
| } |
| } |
| // and ensure not null |
| if (workflowScratchVariables == null) workflowScratchVariables = MutableMap.of(); |
| } |
| |
| private void initializeFromContinuationInstructions(Integer replayFromStep) { |
| if (replayFromStep != null && replayFromStep == STEP_INDEX_FOR_START) { |
| log.debug("Replaying workflow '" + name + "', from start " + |
| "(was at " + (currentStepIndex == null ? "<UNSTARTED>" : workflowStepReference(currentStepIndex)) + ")"); |
| resetWorkflowContextPreviousAndScratchVarsToStep(replayFromStep, false); |
| currentStepIndex = 0; |
| |
| } else if (replayFromStep != null && replayFromStep == STEP_INDEX_FOR_END) { |
| log.debug("Replaying workflow '" + name + "', from end " + |
| "(was at " + (currentStepIndex == null ? "<UNSTARTED>" : workflowStepReference(currentStepIndex)) + ")"); |
| currentStepIndex = STEP_INDEX_FOR_END; |
| resetWorkflowContextPreviousAndScratchVarsToStep(replayFromStep, false); |
| currentStepInstance = null; |
| |
| } else { |
| // replaying workflow |
| log.debug("Replaying workflow '" + name + "', from step " + (replayFromStep == null ? "<CURRENT>" : workflowStepReference(replayFromStep)) + |
| " (was at " + (currentStepIndex == null ? "<UNSTARTED>" : workflowStepReference(currentStepIndex)) + ")"); |
| if (replayFromStep == null) { |
| // replayFromLast should correct the cases below |
| if (currentStepIndex == null) { |
| throw new IllegalStateException("Invalid instructions to continue from last bypassing convenience method, and there is no last"); |
| } else if (currentStepInstance == null || currentStepInstance.stepIndex != currentStepIndex) { |
| throw new IllegalStateException("Invalid instructions to continue from last step which is unknown, bypassing convenience method"); |
| } |
| } else { |
| currentStepIndex = replayFromStep; |
| } |
| // must reset, but okay if null, in that case we are continuing to a step which hasn't been initialized yet; use scratch vars |
| resetWorkflowContextPreviousAndScratchVarsToStep(currentStepIndex, false); |
| } |
| if (continuationInstructions.customWorkflowScratchVariables!=null) { |
| workflowScratchVariables.putAll(continuationInstructions.customWorkflowScratchVariables); |
| } |
| |
| } |
| |
| private void initializeWithoutContinuationInstructions(Integer replayFromStep) { |
| if (replayFromStep == null && currentStepIndex == null) { |
| currentStepIndex = 0; |
| log.debug("Starting workflow '" + name + "', moving to first step " + workflowStepReference(currentStepIndex)); |
| |
| } else if (replayFromStep==null && continueOnErrorHandledOrNextReplay) { |
| // workflow error handler indicated a next step to run |
| |
| } else { |
| // shouldn't come here |
| continueOnErrorHandledOrNextReplay = false; |
| throw new IllegalStateException("Should either be replaying or unstarted, but not invoked as replaying, and current=" + currentStepIndex + " replay=" + replayFromStep); |
| } |
| } |
| |
| private Task<?> initializeTimerFromWorkflowTimeout(Task<?> timerTask) { |
| Task<?> otherTask = Tasks.current(); |
| timerTask = Entities.submit(getEntity(), Tasks.builder().displayName("Timer for " + WorkflowExecutionContext.this.toString() + ":" + taskId) |
| .body(() -> { |
| try { |
| Time.sleep(timeout); |
| if (!otherTask.isDone()) { |
| timerCancelled.set(true); |
| log.debug("Cancelling " + otherTask + " after timeout of " + timeout); |
| otherTask.cancel(true); |
| } |
| } catch (Throwable e) { |
| if (Exceptions.isRootCauseIsInterruption(e)) { |
| // normal, just exit |
| } else { |
| throw Exceptions.propagate(e); |
| } |
| } |
| }).dynamic(false) |
| .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) |
| .tag(BrooklynTaskTags.INESSENTIAL_TASK) |
| .build()); |
| return timerTask; |
| } |
| |
| private Object endWithSuccess() { |
| WorkflowReplayUtils.updateOnWorkflowSuccess(WorkflowExecutionContext.this, task, output); |
| persist(); |
| return output; |
| } |
| |
| private Object endWithError(Throwable e, WorkflowStatus provisionalStatus) { |
| updateStatus(provisionalStatus); |
| WorkflowReplayUtils.updateOnWorkflowError(WorkflowExecutionContext.this, task, e); |
| |
| try { |
| log.debug("Error running workflow " + this + "; will persist then rethrow: " + e); |
| log.trace("Error running workflow " + this + "; will persist then rethrow (details): " + e, e); |
| |
| } catch (Throwable e2) { |
| if (Entities.isUnmanagingOrNoLongerManaged(getEntity())) { |
| log.trace("Error persisting workflow (entity ending) " + this + " after error in workflow; persistence error (details): " + e2, e2); |
| } else { |
| log.error("Error persisting workflow " + this + " after error in workflow; persistence error: " + e2); |
| log.debug("Error persisting workflow " + this + " after error in workflow; persistence error (details): " + e2, e2); |
| log.warn("Error running workflow " + this + ", rethrowing without persisting because of persistence error (above): " + e); |
| } |
| log.trace("Error running workflow " + this + ", rethrowing without persisting because of persistence error (above): " + e, e); |
| } |
| |
| throw Exceptions.propagate(e); |
| } |
| |
| protected void runCurrentStepIfPreconditions() { |
| WorkflowStepDefinition step = getStepsResolved().get(currentStepIndex); |
| if (step!=null) { |
| currentStepInstance = new WorkflowStepInstanceExecutionContext(currentStepIndex, step, WorkflowExecutionContext.this); |
| DslPredicates.DslPredicate conditionResolved = step.getConditionResolved(currentStepInstance); |
| if (conditionResolved!=null) { |
| if (log.isTraceEnabled()) log.trace("Considering condition "+step.condition+" for "+ workflowStepReference(currentStepIndex)); |
| boolean conditionMet = DslPredicates.evaluateDslPredicateWithBrooklynObjectContext(conditionResolved, WorkflowExecutionContext.this, getEntityOrAdjunctWhereRunning()); |
| if (log.isTraceEnabled()) log.trace("Considered condition "+step.condition+" for "+ workflowStepReference(currentStepIndex)+": "+conditionMet); |
| if (!conditionMet) { |
| moveToNextStep("Skipping step "+ workflowStepReference(currentStepIndex), false); |
| return; |
| } |
| } |
| |
| // no condition or condition met -- record and run the step |
| |
| runCurrentStepInstanceApproved(step); |
| |
| } else { |
| // moving to floor/ceiling in treemap made sense when numero-ordered IDs are used, but not with list |
| throw new IllegalStateException("Cannot find step "+currentStepIndex); |
| } |
| } |
| |
| private void runCurrentStepInstanceApproved(WorkflowStepDefinition step) { |
| stepsRun++; |
| |
| Task<?> t; |
| if (continuationInstructions!=null) { |
| t = step.newTaskContinuing(currentStepInstance, continuationInstructions); |
| continuationInstructions = null; |
| } else { |
| t = step.newTask(currentStepInstance); |
| } |
| |
| // about to run -- checkpoint noting current and previous steps, and updating replayable from info |
| OldStepRecord currentStepRecord = oldStepInfo.compute(currentStepIndex, (index, old) -> { |
| if (old == null) old = new OldStepRecord(); |
| old.countStarted++; |
| if (!workflowScratchVariables.isEmpty()) |
| old.workflowScratch = MutableMap.copyOf(workflowScratchVariables); |
| else old.workflowScratch = null; |
| old.previous = MutableSet.<Integer>of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous); |
| old.previousTaskId = previousStepTaskId; |
| old.nextTaskId = null; |
| return old; |
| }); |
| WorkflowReplayUtils.updateReplayableFromStep(WorkflowExecutionContext.this, step); |
| oldStepInfo.compute(previousStepIndex==null ? STEP_INDEX_FOR_START : previousStepIndex, (index, old) -> { |
| if (old==null) old = new OldStepRecord(); |
| old.next = MutableSet.<Integer>of(currentStepIndex).putAll(old.next); |
| old.nextTaskId = t.getId(); |
| return old; |
| }); |
| |
| errorHandlerContext = null; |
| errorHandlerTaskId = null; |
| currentStepInstance.next = null; // clear, eg if was set from a previous run; will be reset from step definition |
| // but don't clear output, in case a step is returning to itself and wants to reference previous_step.output |
| |
| persist(); |
| |
| BiConsumer<Object,Object> onFinish = (output,overrideNext) -> { |
| currentStepInstance.next = WorkflowReplayUtils.getNext(overrideNext, currentStepInstance, step); |
| if (output!=null) currentStepInstance.output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, output, Object.class); |
| }; |
| |
| // now run the step |
| try { |
| Duration duration = step.getTimeout(); |
| if (duration!=null) { |
| boolean isEnded = DynamicTasks.queue(t).blockUntilEnded(duration); |
| if (isEnded) { |
| currentStepInstance.output = t.getUnchecked(); |
| } else { |
| t.cancel(true); |
| throw new TimeoutException("Timeout after "+duration+": "+t.getDisplayName()); |
| } |
| } else { |
| currentStepInstance.output = DynamicTasks.queue(t).getUnchecked(); |
| } |
| |
| // allow output to be customized / overridden |
| onFinish.accept(step.output, null); |
| |
| } catch (Exception e) { |
| handleErrorAtStep(step, t, onFinish, e); |
| } |
| |
| oldStepInfo.compute(currentStepIndex, (index, old) -> { |
| if (old==null) { |
| log.warn("Lost old step info for "+this+", step "+index); |
| old = new OldStepRecord(); |
| } |
| old.countCompleted++; |
| // okay if this gets picked up by accident because we will check the stepIndex it records against the currentStepIndex, |
| // and ignore it if different |
| old.context = currentStepInstance; |
| return old; |
| }); |
| |
| previousStepTaskId = currentStepInstance.taskId; |
| previousStepIndex = currentStepIndex; |
| moveToNextStep("Completed step "+ workflowStepReference(currentStepIndex), false); |
| } |
| |
| private void handleErrorAtStep(WorkflowStepDefinition step, Task<?> stepTaskThrowingError, BiConsumer<Object, Object> onFinish, Exception error) { |
| WorkflowErrorHandling.handleErrorAtStep(getEntity(), step, currentStepInstance, stepTaskThrowingError, onFinish, error, null); |
| } |
| |
| private void logWarnOnExceptionOrDebugIfKnown(Exception e, String msg) { |
| WorkflowErrorHandling.logWarnOnExceptionOrDebugIfKnown(getEntity(), e, msg); |
| } |
| |
| private void moveToNextStep(String prefix, boolean inferredNext) { |
| prefix = prefix + "; "; |
| |
| Object specialNext = WorkflowReplayUtils.getNext(currentStepInstance); |
| |
| continuationInstructions = specialNext instanceof WorkflowStepDefinition.ReplayContinuationInstructions ? (WorkflowStepDefinition.ReplayContinuationInstructions) specialNext : null; |
| if (continuationInstructions!=null) { |
| log.debug(prefix + "proceeding to custom replay: "+continuationInstructions); |
| return; |
| } |
| |
| if (specialNext==null) { |
| currentStepIndex++; |
| if (currentStepIndex < getStepsResolved().size()) { |
| log.debug(prefix + "moving to sequential next step " + workflowStepReference(currentStepIndex)); |
| } else { |
| log.debug(prefix + "no further steps: Workflow completed"); |
| } |
| } else if (specialNext instanceof String) { |
| String explicitNext = (String)specialNext; |
| Maybe<Pair<Integer, Boolean>> nextResolved = getIndexOfStepId(explicitNext); |
| if (nextResolved.isAbsent()) { |
| log.warn(prefix + (inferredNext ? "inferred" : "explicit") + " next step '"+explicitNext+"' not found (failing)"); |
| // throw |
| nextResolved.get(); |
| } |
| if (nextResolved.get().getLeft()==null) { |
| throw new IllegalArgumentException("Next step '"+explicitNext+"' not supported here"); |
| } |
| |
| currentStepIndex = nextResolved.get().getLeft(); |
| if (nextResolved.get().getRight()) { |
| if (currentStepIndex < getStepsResolved().size()) { |
| log.debug(prefix + "moving to "+(inferredNext ? "inferred" : "explicit")+" next step " + workflowStepReference(currentStepIndex) + " for token '" + explicitNext + "'"); |
| } else { |
| log.debug(prefix + (inferredNext ? "inferred" : "explicit") + " next step '"+explicitNext+"': Workflow completed"); |
| } |
| } else { |
| log.debug(prefix + "moving to "+(inferredNext ? "inferred" : "explicit")+" next step " + workflowStepReference(currentStepIndex) + " for id '" + explicitNext + "'"); |
| } |
| } else { |
| throw new IllegalStateException("Illegal next definition: "+specialNext+" (type "+specialNext.getClass()+")"); |
| } |
| } |
| |
| String workflowStepReference(Integer index) { |
| if (index==null) return workflowId+"-<no-step>"; |
| |
| if (index>=getStepsResolved().size()) return getWorkflowStepReference(index, "<END>", false); |
| return getWorkflowStepReference(index, getStepsResolved().get(index)); |
| } |
| |
| public Body withIntro(Runnable intro) { |
| this.intro = intro; |
| return this; |
| } |
| } |
| |
| public Maybe<Pair<Integer,Boolean>> getIndexOfStepId(String next) { |
| if (next==null) return Maybe.absent("Null step ID supplied"); |
| Function<WorkflowExecutionContext, Integer> predefined = PREDEFINED_NEXT_TARGETS.get(next.toLowerCase()); |
| if (predefined!=null) return Maybe.of(Pair.of(predefined.apply(this), true)); |
| Pair<Integer, WorkflowStepDefinition> explicit = getStepsWithExplicitIdById().get(next); |
| if (explicit!=null) return Maybe.of(Pair.of(explicit.getLeft(), false)); |
| return Maybe.absent(new NoSuchElementException("Step with ID '"+next+"' not found")); |
| } |
| |
| String getWorkflowStepReference(int index, WorkflowStepDefinition step) { |
| return getWorkflowStepReference(index, step!=null ? step.id : null, false); |
| } |
| String getWorkflowStepReference(int index, String optionalStepId, boolean isError) { |
| // error handler step number not always available here, |
| // and risk of ID ambiguity with error-handler phrase; |
| // only for logging so this is okay; |
| // for canonical usage, prefer the Task-based method below |
| return workflowId+(index>=0 ? "-"+(index+1) : (index==STEP_INDEX_FOR_ERROR_HANDLER && isError) ? "" : "-"+indexCode(index)) |
| +(Strings.isNonBlank(optionalStepId) ? "-"+optionalStepId : "") |
| +(isError ? "-"+LABEL_FOR_ERROR_HANDLER : ""); |
| } |
| public String getWorkflowStepReference(Task<?> t) { |
| BrooklynTaskTags.WorkflowTaskTag wt = BrooklynTaskTags.getWorkflowTaskTag(t, false); |
| if (wt.getErrorHandlerIndex()!=null) { |
| // formula below not suitable for error tasks, but the name should be good |
| return t.getDisplayName(); |
| } |
| return wt.getWorkflowId() |
| +(wt.getStepIndex()!=null && wt.getStepIndex()>=0 && wt.getStepIndex()<stepsDefinition.size() ? "-"+(wt.getStepIndex()+1) : "") |
| +(wt.getErrorHandlerIndex()!=null ? "-error-handler-"+(wt.getErrorHandlerIndex()+1) : "") |
| ; |
| } |
| |
| private String indexCode(int index) { |
| // these numbers shouldn't be used for much, but they are used in a few places :( |
| if (index==STEP_INDEX_FOR_START) return STEP_TARGET_NAME_FOR_START; |
| if (index==STEP_INDEX_FOR_END) return STEP_TARGET_NAME_FOR_END; |
| if (index==STEP_INDEX_FOR_ERROR_HANDLER) return LABEL_FOR_ERROR_HANDLER; |
| return "neg-"+(index); // unknown |
| } |
| |
| } |