blob: 5f48b8a72a7297df03f4d62bdd1129a8154280fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.workflow;
import com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Strings;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class WorkflowReplayUtils {
private static final Logger log = LoggerFactory.getLogger(WorkflowReplayUtils.class);
public enum ReplayResumeDepthCheck {
RESUMABLE_IFF_NESTED_WORKFLOWS_RESUMABLE,
RESUMABLE_IFF_NESTED_WORKFLOWS_REPLAYABLE_OR_RESUMABLE,
RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT,
}
public static boolean isReplayResumable(WorkflowExecutionContext workflowExecutionContext, ReplayResumeDepthCheck requireDeeplyReplayable, boolean allowInternallyEvenIfDisabled) {
WorkflowStepInstanceExecutionContext csi = workflowExecutionContext.currentStepInstance;
if (csi!=null) {
if (csi.getStepIndex()!=workflowExecutionContext.currentStepIndex) {
// always resumable between steps
return true;
}
WorkflowStepDefinition stepDefinition = workflowExecutionContext.getStepsResolved().get(csi.stepIndex);
if (stepDefinition!=null) {
Boolean idempotence = stepDefinition.isIdempotent(csi);
// 'no' is (currently) not overridable by auto-detection, but `yes` is.
if (Boolean.FALSE.equals(idempotence)) return false;
// if 'yes' or null, we check subworkflows
if (stepDefinition instanceof WorkflowStepDefinition.WorkflowStepDefinitionWithSubWorkflow) {
WorkflowStepDefinition.SubWorkflowsForReplay subWorkflowReplayable = ((WorkflowStepDefinition.WorkflowStepDefinitionWithSubWorkflow) stepDefinition).getSubWorkflowsForReplay(csi, false, true, allowInternallyEvenIfDisabled);
if (!subWorkflowReplayable.isResumableAtSubworkflows) {
if (subWorkflowReplayable.hasNonResumableWorkflows && requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_RESUMABLE) return false;
return subWorkflowReplayable.isResumableOnlyAtParent;
}
// non-null subworkflows, so need to inspect subworkflows
if (requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT) return true;
if (requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_REPLAYABLE_OR_RESUMABLE)
return subWorkflowReplayable.subworkflows.stream().allMatch(sub -> isReplayableAnywhere(sub, allowInternallyEvenIfDisabled));
if (requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_RESUMABLE)
return subWorkflowReplayable.subworkflows.stream().allMatch(sub -> isReplayResumable(sub, requireDeeplyReplayable, allowInternallyEvenIfDisabled));
// shouldn't come here
throw new IllegalArgumentException("Invalid requireDeeply mode: "+requireDeeplyReplayable);
}
if (idempotence!=null) return idempotence;
// comes here if a workflow step without subworkflows declares null default idempotence, which should not normally happen
return false;
} else {
return isReplayableFromStep(workflowExecutionContext, csi.stepIndex);
}
} else {
// if between steps or at start or end, can use replayable from step
return isReplayableFromStep(workflowExecutionContext, workflowExecutionContext.currentStepIndex);
}
}
static boolean isReplayableFromStep(WorkflowExecutionContext workflowExecutionContext, Integer stepIndex) {
if (stepIndex==null || stepIndex==WorkflowExecutionContext.STEP_INDEX_FOR_START) {
return (Boolean.TRUE.equals(workflowExecutionContext.replayableFromStart) || Objects.equals(workflowExecutionContext.replayableLastStep, WorkflowExecutionContext.STEP_INDEX_FOR_START));
}
if (stepIndex==WorkflowExecutionContext.STEP_INDEX_FOR_END) {
// always "resumable" at end
return true;
}
if (Objects.equals(stepIndex, workflowExecutionContext.currentStepIndex) && Objects.equals(workflowExecutionContext.replayableLastStep, stepIndex)) {
return true;
}
WorkflowExecutionContext.OldStepRecord osi = workflowExecutionContext.oldStepInfo.get(stepIndex);
if (osi!=null) {
return Boolean.TRUE.equals(osi.replayableFromHere);
}
// no step info so hasn't even been registered yet (will be updated before start, but shouldn't come here)
return false;
}
public static boolean isReplayableAnywhere(WorkflowExecutionContext workflowExecutionContext, boolean allowInternallyEvenIfDisabled) {
if (workflowExecutionContext.factory(allowInternallyEvenIfDisabled).isDisabled()) return false;
return workflowExecutionContext.currentStepIndex==null
|| workflowExecutionContext.replayableLastStep !=null
|| Boolean.TRUE.equals(workflowExecutionContext.replayableFromStart)
|| (workflowExecutionContext.currentStepInstance!=null && workflowExecutionContext.currentStepInstance.getStepIndex()!=workflowExecutionContext.currentStepIndex)
|| workflowExecutionContext.currentStepIndex==WorkflowExecutionContext.STEP_INDEX_FOR_START
|| workflowExecutionContext.currentStepIndex==WorkflowExecutionContext.STEP_INDEX_FOR_END
|| isReplayResumable(workflowExecutionContext, ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_REPLAYABLE_OR_RESUMABLE, allowInternallyEvenIfDisabled);
}
/** throws error if any argument non-blank invalid; null if nothing to do; otherwise a consumer which will initialize the WEC */
public static Consumer<WorkflowExecutionContext> updaterForReplayableAtWorkflow(String replayable, String idempotent, boolean isNestedWorkflowStep) {
if (replayable==null) replayable = "";
replayable = replayable.toLowerCase().replaceAll("[^a-z]+", " ").trim();
if (idempotent==null) idempotent = "";
idempotent = idempotent.toLowerCase().replaceAll("[^a-z]+", " ").trim();
boolean idempotentAll = "all".equals(idempotent);
if (idempotentAll) idempotent = "";
if (!Strings.isBlank(idempotent)) throw new IllegalArgumentException("Invalid value for `idempotent` on workflow step");
// replayable:
//
//`enabled` (the default): is is permitted to replay resuming wherever the workflow fails on idempotent steps or where there are explicit replay points
//`disabled`: it is not permitted for callers to replay the workflow, whether operator-driven or automatic; resumable steps and replay points in the workflow are not externally visible (but may still be used by replays triggered within the workflow)
//`from start`: the workflow start is a replay point
//`automatically`: indicates that on an unhandled Brooklyn failover (DanglingWorkflowException), the workflow should attempt to replay resuming; implies `enabled`,
// can be combined with `from start`
boolean replayableAutomatically = (replayable.contains("automatically"));
if (replayableAutomatically) replayable = replayable.replace("automatically", "").trim();
if (!replayableAutomatically && replayable.equals("enabled")) { replayable = ""; }
boolean replayableDisabled = !replayableAutomatically && replayable.equals("disabled");
if (replayableDisabled) replayable = "";
boolean replayableFromStart = replayable.equals("from start");
if (replayableFromStart) replayable = "";
if (!Strings.isBlank(replayable)) {
if (!replayableAutomatically && isNestedWorkflowStep)
validateReplayableAndIdempotentAtStep(replayable, idempotent, false);
else if (replayableAutomatically)
throw new IllegalArgumentException("Invalid 'replayable' value: 'automatically' cannot be used with '"+replayable+"'");
else
throw new IllegalArgumentException("Invalid 'replayable' value: '"+replayable+"'");
}
return ctx -> {
if (replayableFromStart) {
ctx.replayableFromStart = replayableFromStart;
ctx.replayableLastStep = WorkflowExecutionContext.STEP_INDEX_FOR_START;
}
ctx.replayableAutomatically = replayableAutomatically ? true : null;
ctx.replayableDisabled = replayableDisabled ? true : null;
ctx.idempotentAll = idempotentAll ? true : null;
};
}
public static Consumer<WorkflowExecutionContext> updaterForReplayableAtWorkflow(ConfigBag paramsDefiningWorkflow, boolean isNestedWorkflowStep) {
return updaterForReplayableAtWorkflow(paramsDefiningWorkflow.get(WorkflowCommonConfig.REPLAYABLE), paramsDefiningWorkflow.get(WorkflowCommonConfig.IDEMPOTENT), isNestedWorkflowStep);
}
public static void updateReplayableFromStep(WorkflowExecutionContext context, WorkflowStepDefinition step) {
Pair<ReplayableAtStepOption, Boolean> opt = step.validateReplayableAndIdempotent();
// idempotence on RHS ignored here, considered when we find the last
if (opt.getLeft()!=null && opt.getLeft().isReplayPoint) {
context.oldStepInfo.get(context.currentStepIndex).replayableFromHere = true;
context.replayableLastStep = context.currentStepIndex;
}
if (opt.getLeft()!=null && opt.getLeft().forcesReset) {
context.replayableLastStep = null;
context.oldStepInfo.forEach( (k,v) -> {
v.replayableFromHere = (opt.getLeft().isReplayPoint && Objects.equals(k, context.currentStepIndex)) ? true : null;
});
}
}
public enum ReplayableAtStepOption {
RESET(false, true), FROM_HERE(true, false), FROM_HERE_ONLY(true, true);
private final boolean isReplayPoint;
private final boolean forcesReset;
ReplayableAtStepOption(boolean isReplayPoint, boolean forcesReset) {
this.isReplayPoint = isReplayPoint;
this.forcesReset = forcesReset;
}
public static Maybe<ReplayableAtStepOption> ofMaybe(String replayable) {
if (Strings.isBlank(replayable)) return Maybe.absentNull();
if ("reset".equalsIgnoreCase(replayable)) return Maybe.of(RESET);
if ("from here".equalsIgnoreCase(replayable)) return Maybe.of(FROM_HERE);
if ("from here only".equalsIgnoreCase(replayable)) return Maybe.of(FROM_HERE_ONLY);
return Maybe.absent("Invalid 'replayable' value: "+replayable);
}
}
public static Pair<ReplayableAtStepOption,Boolean> validateReplayableAndIdempotentAtStep(String replayable, String idempotent, boolean asWorkflowDefinition) {
if (replayable==null) replayable = "";
if (idempotent==null) idempotent = "";
replayable = replayable.toLowerCase().replaceAll("[^a-z]+", " ").trim();
idempotent = idempotent.toLowerCase().replaceAll("[^a-z]+", " ").trim();
Maybe<ReplayableAtStepOption> rv = Strings.isBlank(replayable) ? Maybe.ofAllowingNull(null) : ReplayableAtStepOption.ofMaybe(replayable);
Maybe<Boolean> id = validateIdempotentAtStep(idempotent);
if (asWorkflowDefinition) {
// this will through exception if unacceptable
updaterForReplayableAtWorkflow(rv.isPresent() ? null : replayable, id.isPresent() ? null : idempotent, false);
} else {
rv.get();
id.get();
}
return Pair.of(rv.orNull(), id.orNull());
}
private static Maybe<Boolean> validateIdempotentAtStep(String idempotent) {
if (Strings.isBlank(idempotent) || idempotent.equals("default")) return Maybe.ofAllowingNull(null);
if (idempotent.equals("yes") || idempotent.equals("true")) return Maybe.of(true);
if (idempotent.equals("no") || idempotent.equals("false")) return Maybe.of(false);
return Maybe.absent("Invalid value for idempotent: '"+idempotent+"'");
}
public static Integer findNearestReplayPoint(final WorkflowExecutionContext context, final int stepIndex0) {
return findNearestReplayPoint(context, stepIndex0, true);
}
public static Integer findNearestReplayPoint(final WorkflowExecutionContext context, final int stepIndex0, boolean allowInclusive) {
int stepIndex = stepIndex0;
Set<Integer> considered = MutableSet.of();
Set<Integer> possibleOthers = MutableSet.of();
while (true) {
if (allowInclusive && WorkflowReplayUtils.isReplayableFromStep(context, stepIndex)) {
break;
}
allowInclusive = true;
if (stepIndex == WorkflowExecutionContext.STEP_INDEX_FOR_START) {
return null;
}
// look at the previous step
WorkflowExecutionContext.OldStepRecord osi = context.oldStepInfo.get(stepIndex);
if (osi == null) {
log.warn("Unable to backtrack from step " + stepIndex + "; no step information. Will try to replay from start.");
stepIndex = WorkflowExecutionContext.STEP_INDEX_FOR_START;
continue;
}
Set<Integer> prev = osi.previous;
if (prev == null || prev.isEmpty()) {
log.warn("Unable to backtrack from step " + stepIndex + "; no previous step recorded. Will try to replay from start.");
stepIndex = WorkflowExecutionContext.STEP_INDEX_FOR_START;
continue;
}
boolean repeating = !considered.add(stepIndex);
if (repeating) {
if (possibleOthers.size() != 1) {
log.warn("Unable to backtrack from step " + stepIndex + "; ambiguous precedents " + prev + " / " + possibleOthers + ". Will try to replay from start.");
stepIndex = WorkflowExecutionContext.STEP_INDEX_FOR_START;
continue;
} else {
stepIndex = possibleOthers.iterator().next();
continue;
}
}
Iterator<Integer> prevI = prev.iterator();
stepIndex = prevI.next();
while (prevI.hasNext()) {
possibleOthers.add(prevI.next());
}
}
return stepIndex;
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class WorkflowReplayRecord {
String taskId;
String reasonForReplay;
String submittedByTaskId;
long submitTimeUtc;
long startTimeUtc;
long endTimeUtc;
String status;
Boolean isError;
Object result;
private static void add(WorkflowExecutionContext ctx, Task<?> task, String reasonForReplay) {
WorkflowReplayRecord wrr = new WorkflowReplayRecord();
wrr.taskId = task.getId();
wrr.reasonForReplay = reasonForReplay;
ctx.replays.add(wrr);
ctx.replayCurrent = wrr;
update(ctx, task);
}
private static void updateInternal(WorkflowExecutionContext ctx, Task<?> task, Boolean forceEndSuccessOrError, Object result) {
if (ctx.replayCurrent ==null || ctx.replayCurrent.taskId!=task.getId()) {
log.warn("Mismatch in workflow replays for "+ctx+": "+ctx.replayCurrent +" vs "+task);
return;
}
// try hard to get submitter data in case tasks go awol before execution
if (task.getSubmittedByTaskId()!=null) {
ctx.replayCurrent.submittedByTaskId = task.getSubmittedByTaskId();
} else if (ctx.replayCurrent.submittedByTaskId==null && Tasks.current()!=null && !Tasks.current().equals(task)) {
ctx.replayCurrent.submittedByTaskId = Tasks.current().getId();
}
ctx.replayCurrent.submitTimeUtc = task.getSubmitTimeUtc();
// fake this because we won't see the real value until we also see the start value.
// however we need to ensure any workflow that is created is intended to be run.
if (ctx.replayCurrent.submitTimeUtc<=0) ctx.replayCurrent.submitTimeUtc = Instant.now().toEpochMilli();
ctx.replayCurrent.startTimeUtc = task.getStartTimeUtc();
ctx.replayCurrent.endTimeUtc = task.getEndTimeUtc();
ctx.replayCurrent.status = task.getStatusSummary();
if (forceEndSuccessOrError==null) {
ctx.replayCurrent.isError = task.isDone() ? task.isError() : null;
try {
ctx.replayCurrent.result = task.isDone() ? task.get() : null;
} catch (Throwable t) {
ctx.replayCurrent.result = Exceptions.collapseTextInContext(t, task);
}
} else {
// when forcing end, we are invoked _by_ the task so we fake the completion information
if (ctx.replayCurrent.endTimeUtc <= 0) {
ctx.replayCurrent.endTimeUtc = System.currentTimeMillis();
ctx.replayCurrent.status = forceEndSuccessOrError ? "Completed" : "Failed";
}
ctx.replayCurrent.isError = !forceEndSuccessOrError;
ctx.replayCurrent.result = result;
}
}
private static void update(WorkflowExecutionContext ctx, Task<?> task) {
updateInternal(ctx, task, null, null);
}
public String getTaskId() {
return taskId;
}
@Override
public String toString() {
return super.toString()+"[task="+taskId+"]";
}
}
/** called when the task is being created */
public static void updateOnWorkflowStartOrReplay(WorkflowExecutionContext ctx, Task<?> task, String reasonForReplay, Integer fixedStepToReplayFrom) {
WorkflowReplayRecord.add(ctx, task, reasonForReplay);
if (fixedStepToReplayFrom!=null) {
// if an explicit step for replay given, that becomes the last step to replay from
ctx.replayableLastStep = fixedStepToReplayFrom;
}
}
public static void updateOnWorkflowSuccess(WorkflowExecutionContext ctx, Task<?> task, Object result) {
WorkflowReplayRecord.updateInternal(ctx, task, true, result);
ctx.replayableLastStep = WorkflowExecutionContext.STEP_INDEX_FOR_END;
}
public static void updateOnWorkflowError(WorkflowExecutionContext ctx, Task<?> task, Throwable error) {
WorkflowReplayRecord.updateInternal(ctx, task, false, Exceptions.collapseTextInContext(error, task));
// no change to last replayable step
}
/** called when the workflow task is starting to run, after WorkflowStartOrReplay */
public static void updateOnWorkflowTaskStartupOrReplay(WorkflowExecutionContext ctx, Task<?> task, List<WorkflowStepDefinition> stepsResolved, boolean firstRun, Integer optionalReplayStep) {
WorkflowReplayRecord.updateInternal(ctx, task, null, null);
}
/** creates a task to replay the subworkflow, returning it, or null if the workflow completed successfully, or throwing if the workflow cannot be replayed */
private static Task<Object> createReplayResumingSubWorkflowTaskOrThrow(WorkflowExecutionContext subWorkflow, WorkflowStepDefinition.ReplayContinuationInstructions instructions, boolean allowInternallyEvenIfDisabled) {
if (instructions.stepToReplayFrom!=null) {
// shouldn't come here
throw new IllegalStateException("Cannot replay a nested workflow where the parent started at a specific step");
} else if (instructions.forced && instructions.customBehavior !=null) {
// forced, eg throwing exception
log.debug("Creating task to replay subworkflow " + subWorkflow+" from last, forced with custom behaviour - "+instructions);
return subWorkflow.factory(allowInternallyEvenIfDisabled).createTaskReplaying(subWorkflow.factory(allowInternallyEvenIfDisabled).makeInstructionsForReplayResumingForcedWithCustom(instructions.customBehaviorExplanation, instructions.customBehavior));
} else {
if (Objects.equals(subWorkflow.replayableLastStep, WorkflowExecutionContext.STEP_INDEX_FOR_END)) {
log.debug("Creating task to replay subworkflow " + subWorkflow+" from last, but already at end - "+instructions);
return null;
}
// may throw if not forced and not replayable
log.debug("Creating task to replay subworkflow " + subWorkflow+" from last: "+instructions);
WorkflowStepDefinition.ReplayContinuationInstructions subInstr = subWorkflow.factory(allowInternallyEvenIfDisabled).makeInstructionsForReplayResuming(instructions.customBehaviorExplanation, instructions.forced);
log.debug("Creating task to replay subworkflow " + subWorkflow+", will use: "+subInstr);
return subWorkflow.factory(allowInternallyEvenIfDisabled).createTaskReplaying(subInstr);
}
}
/** replays the workflow indicated by the set, returning the result, or if not possible creates a new task */
public static Object replayResumingInSubWorkflow(String summary, WorkflowStepInstanceExecutionContext context, WorkflowExecutionContext w, WorkflowStepDefinition.ReplayContinuationInstructions instructions, BiFunction<WorkflowExecutionContext,Exception,Object> ifNotReplayable, boolean allowInternallyEvenIfDisabled) {
Pair<Boolean, Object> check = checkReplayResumingInSubWorkflowAlsoReturningTaskOrResult(summary, context, w, instructions, ifNotReplayable, allowInternallyEvenIfDisabled);
if (check.getLeft()) return DynamicTasks.queue((Task<?>) check.getRight()).getUnchecked();
else return check.getRight();
}
public static Pair<Boolean,Object> checkReplayResumingInSubWorkflowAlsoReturningTaskOrResult(String summary, WorkflowStepInstanceExecutionContext context, WorkflowExecutionContext w, WorkflowStepDefinition.ReplayContinuationInstructions instructions, BiFunction<WorkflowExecutionContext,Exception,Object> ifNotReplayable, boolean allowInternallyEvenIfDisabled) {
Task<Object> t;
try {
t = WorkflowReplayUtils.createReplayResumingSubWorkflowTaskOrThrow(w, instructions, allowInternallyEvenIfDisabled);
if (t == null) {
// subworkflow completed
return Pair.of(false, w.getOutput());
}
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
log.debug("Step " + context.getWorkflowStepReference() + " could not resume nested workflow " + (w==null ? "<null>" : w.getWorkflowId()) + " (running alternate): "+e);
return Pair.of(false, ifNotReplayable.apply(w, e));
}
log.debug("Step " + context.getWorkflowStepReference() + " resuming nested workflow " + w.getWorkflowId() + " in task " + t.getId());
return Pair.of(true, t);
}
public static void setNewSubWorkflows(WorkflowStepInstanceExecutionContext context, List<BrooklynTaskTags.WorkflowTaskTag> tags, String supersededId) {
// make sure parent knows about child before child workflow is persisted, otherwise there is a chance the child workflow gets orphaned (if interrupted before parent persists)
// supersede old and save the new sub-workflow ID before submitting it, and before child knows about parent, per invoke effector step notes
markSubWorkflowsSupersededByTask(context, supersededId);
tags.forEach(nw -> addNewSubWorkflow(context, nw));
context.getWorkflowExectionContext().persist();
}
public static void markSubWorkflowsSupersededByTask(WorkflowStepInstanceExecutionContext context, String supersededId) {
context.getSubWorkflows().forEach(tag -> tag.setSupersededByTaskId(supersededId));
}
public static boolean addNewSubWorkflow(WorkflowStepInstanceExecutionContext context, BrooklynTaskTags.WorkflowTaskTag nw) {
if (nw==null) throw new IllegalArgumentException("Workflow tag must not be null");
return context.getSubWorkflows().add(nw);
}
public static WorkflowStepDefinition.SubWorkflowsForReplay getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled,
Consumer<WorkflowStepDefinition.SubWorkflowsForReplay> ifNoSubworkflows) {
Set<BrooklynTaskTags.WorkflowTaskTag> sws = context.getSubWorkflows();
WorkflowStepDefinition.SubWorkflowsForReplay result = new WorkflowStepDefinition.SubWorkflowsForReplay();
if (sws!=null && !sws.isEmpty()) {
// replaying
List<WorkflowExecutionContext> nestedWorkflowsToReplay = sws.stream().filter(tag -> tag.getSupersededByTaskId()==null)
.map(tag -> {
Entity targetEntity = (Entity) context.getManagementContext().lookup(tag.getEntityId());
if (targetEntity == null) {
log.warn("Unable to find entity for sub-workflow "+tag+" in "+context+"; assuming entity has gone, and may trigger recomputation of targets");
return null;
}
return new WorkflowStatePersistenceViaSensors(context.getManagementContext()).getWorkflows(targetEntity).get(tag.getWorkflowId());
}).collect(Collectors.toList());
result.subworkflows = nestedWorkflowsToReplay;
if (nestedWorkflowsToReplay.isEmpty()) {
if (!peekingOnly) log.info("Step "+context.getWorkflowStepReference()+" has all sub workflows superseded; replaying from start");
if (!peekingOnly) log.debug("Step "+context.getWorkflowStepReference()+" superseded sub workflows detail: "+sws+" -> "+nestedWorkflowsToReplay);
ifNoSubworkflows.accept(result);
} else if (nestedWorkflowsToReplay.contains(null)) {
if (!peekingOnly) log.info("Step "+context.getWorkflowStepReference()+" has uninitialized sub workflows; replaying from start");
if (!peekingOnly) log.debug("Step "+context.getWorkflowStepReference()+" uninitialized/unpersisted sub workflow detail: "+sws+" -> "+nestedWorkflowsToReplay);
ifNoSubworkflows.accept(result);
} else if (!forced && nestedWorkflowsToReplay.stream().anyMatch(nest -> !isReplayableAnywhere(nest, allowInternallyEvenIfDisabled))) {
if (!peekingOnly) log.info("Step "+context.getWorkflowStepReference()+" has non-replayable sub workflows; replaying from start");
if (!peekingOnly) log.debug("Step "+context.getWorkflowStepReference()+" non-replayable sub workflow detail: "+sws+" -> "+nestedWorkflowsToReplay);
result.hasNonResumableWorkflows = true;
// parent not resumable
} else {
if (!peekingOnly) log.debug("Step "+context.getWorkflowStepReference()+" replay sub workflow detail: "+sws+" -> "+nestedWorkflowsToReplay);
result.isResumableAtSubworkflows = true;
}
} else {
ifNoSubworkflows.accept(result);
}
return result;
}
public static Object getNext(Object ...sources) {
Object result = null;
for (Object o: sources) {
if (o==null) continue;
if (o instanceof WorkflowStepInstanceExecutionContext) result = ((WorkflowStepInstanceExecutionContext)o).next;
else if (o instanceof WorkflowStepDefinition) result = ((WorkflowStepDefinition)o).next;
else if (o instanceof String || o instanceof WorkflowStepDefinition.ReplayContinuationInstructions) result = o;
else throw new IllegalArgumentException("Next not supported for "+o+" (type "+o.getClass()+")");
if (result!=null) break;
}
return result;
}
}