blob: 88962ea7122edf74cb05329ca04503e977cda05c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.workflow;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityAsserts;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.internal.EntityManagementSupport;
import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixture;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors;
import org.apache.brooklyn.entity.stock.BasicApplication;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.test.ClassLogWatcher;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicApplication> {
private static final Logger log = LoggerFactory.getLogger(WorkflowPersistReplayErrorsTest.class);
private BasicApplication app;
@Override
protected LocalManagementContext decorateOrigOrNewManagementContext(LocalManagementContext mgmt) {
WorkflowBasicTest.addWorkflowStepTypes(mgmt);
return super.decorateOrigOrNewManagementContext(mgmt);
}
@Override
protected BasicApplication createApp() {
return null;
}
@Override protected BasicApplication rebind() throws Exception {
return rebind(RebindOptions.create().terminateOrigManagementContext(true));
}
Task<?> runStep(Object step, Consumer<BasicApplication> appFunction) {
return runSteps(MutableList.<Object>of(step), appFunction);
}
Task<?> runSteps(List<Object> steps, Consumer<BasicApplication> appFunction) {
return runSteps(steps, appFunction, null);
}
Task<?> runSteps(List<Object> steps, Consumer<BasicApplication> appFunction, ConfigBag initialEffectorConfig) {
BasicApplication app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
this.app = app;
return runStepsOnExistingApp("myWorkflow", steps, appFunction, initialEffectorConfig);
}
Task<?> runStepsOnExistingApp(String effectorName, List<Object> steps, Consumer<BasicApplication> appFunction, ConfigBag initialEffectorConfig) {
addEffector(effectorName, steps, appFunction, initialEffectorConfig);
return app.invoke(app.getEntityType().getEffectorByName(effectorName).get(), null);
}
private WorkflowEffector addEffector(String effectorName, List<Object> steps, Consumer<BasicApplication> appFunction, ConfigBag initialEffectorConfig) {
WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance()
.configure(WorkflowEffector.EFFECTOR_NAME, effectorName)
.configure(WorkflowEffector.STEPS, steps)
.copy(initialEffectorConfig)
);
if (appFunction !=null) appFunction.accept(app);
eff.apply((EntityLocal)app);
return eff;
}
Task<?> lastInvocation;
WorkflowExecutionContext lastWorkflowContext;
public final static List<Object> INCREMENTING_X_STEPS = MutableList.<Object>of(
"let integer x = ${entity.sensor.x} ?? 0",
"let x = ${x} + 1",
"set-sensor x = ${x}",
"wait ${entity.attributeWhenReady.gate}",
"let x = ${entity.sensor.x} + 10",
"set-sensor x = ${x}",
"return ${x}").asUnmodifiable();
private void runIncrementingX() {
lastInvocation = runSteps(INCREMENTING_X_STEPS, null);
// ensure workflow sensor set immediately (this is done synchronously when effector invocation task is created, to ensure it can be resumed)
findSingleLastWorkflow();
}
private void runIncrementingXAwaitingGate() {
runIncrementingX();
EntityAsserts.assertAttributeEqualsEventually(app, Sensors.newSensor(Object.class, "x"), 1);
// refresh this for good measure (it won't have changed but feels like bad practice to rely on that)
lastWorkflowContext = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app).values().iterator().next();
}
@Test
public void testWorkflowSensorValuesWhenPausedAndCanReplay() throws IOException {
runIncrementingXAwaitingGate();
EntityAsserts.assertAttributeEqualsEventually(app, Sensors.newSensor(Object.class, "x"), 1);
Integer index = lastWorkflowContext.getCurrentStepIndex();
Asserts.assertTrue(index >= 2 && index <= 3, "Index is "+index);
Asserts.assertEquals(lastWorkflowContext.status, WorkflowExecutionContext.WorkflowStatus.RUNNING);
Asserts.assertFalse(lastInvocation.isDone());
app.sensors().set(Sensors.newBooleanSensor("gate"), true);
lastInvocation.blockUntilEnded(Duration.seconds(2));
Asserts.assertEquals(lastInvocation.getUnchecked(), 11);
Asserts.assertEquals(lastWorkflowContext.status, WorkflowExecutionContext.WorkflowStatus.SUCCESS);
app.sensors().set(Sensors.newBooleanSensor("gate"), false);
Task<Object> invocation2 = DynamicTasks.submit(lastWorkflowContext.factory(false).createTaskReplaying(lastWorkflowContext.factory(false).makeInstructionsForReplayingFromStep(1, "Test", true)), app);
// sensor should go back to 1 because workflow vars are stored per-state
EntityAsserts.assertAttributeEqualsEventually(app, Sensors.newSensor(Object.class, "x"), 1);
Time.sleep(10);
Asserts.assertFalse(invocation2.isDone());
app.sensors().set(Sensors.newBooleanSensor("gate"), true);
invocation2.blockUntilEnded(Duration.seconds(2));
Asserts.assertEquals(invocation2.getUnchecked(), 11);
}
@Test
public void testWorkflowInterruptedAndCanReplay() throws IOException {
runIncrementingXAwaitingGate();
// cancel the workflow
lastWorkflowContext.getTask(true).get().cancel(true);
// workflow should no longer proceed even if gate is set
app.sensors().set(Sensors.newBooleanSensor("gate"), true);
lastInvocation.blockUntilEnded(Duration.seconds(2));
Asserts.assertTrue(lastInvocation.isError());
// sensor should not increment to 2
// workflow should go to error (before invocation ended)
// TODO should show error when persisted
lastWorkflowContext.persist();
Asserts.assertThat(lastWorkflowContext.status, status -> status.error);
// and sensor will not be set to 2
EntityAsserts.assertAttributeEquals(app, Sensors.newSensor(Object.class, "x"), 1);
Integer index = lastWorkflowContext.getCurrentStepIndex();
Asserts.assertTrue(index >= 2 && index <= 3, "Index is "+index);
Task<Object> invocation2 = DynamicTasks.submit(lastWorkflowContext.factory(false).createTaskReplaying(lastWorkflowContext.factory(false).makeInstructionsForReplayResuming("test", true)), app);
// the gate is set so this will finish soon
Asserts.assertEquals(invocation2.getUnchecked(), 11);
EntityAsserts.assertAttributeEquals(app, Sensors.newSensor(Object.class, "x"), 11);
}
public static <T> T possiblyWithAutoFailAndReplay(boolean autoFailAndResume, Callable<T> callable) throws Exception {
Boolean old = null;
try {
if (!autoFailAndResume) {
old = EntityManagementSupport.AUTO_FAIL_AND_RESUME_WORKFLOWS;
EntityManagementSupport.AUTO_FAIL_AND_RESUME_WORKFLOWS = false;
}
return callable.call();
} finally {
if (old!=null) {
EntityManagementSupport.AUTO_FAIL_AND_RESUME_WORKFLOWS = old;
}
}
}
public static void possiblyWithAutoFailAndReplay(boolean autoFailAndResume, Runnable runnable) {
try {
possiblyWithAutoFailAndReplay(autoFailAndResume, () -> {
runnable.run();
return null;
});
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
@Test(groups="Integration", invocationCount = 10) // because slow, and tests variety of interruption points with randomised delay
public void testWorkflowShutdownAndCanReplay() throws Exception {
doTestWorkflowShutdownAndCanReplay(false);
}
@Test(groups="Integration", invocationCount = 10) // because slow, and tests variety of interruption points with randomised delay
public void testWorkflowShutdownAndAutomaticReplay() throws Exception {
doTestWorkflowShutdownAndCanReplay(true);
}
public void doTestWorkflowShutdownAndCanReplay(boolean autoFailAndReplay) throws Exception {
runIncrementingX();
// variable delay, shouldn't matter when it is interrupted
Time.sleep((long) (Math.random()*Math.random()*200));
possiblyWithAutoFailAndReplay(autoFailAndReplay, () -> {
// do rebind
ManagementContext oldMgmt = mgmt();
app = rebind();
// shutdown flags should get set on old object when destroyed
((ManagementContextInternal) oldMgmt).terminate();
WorkflowExecutionContext prevWorkflowContext = lastWorkflowContext;
Asserts.eventually(() -> prevWorkflowContext.status, status -> status.error);
Asserts.assertEquals(prevWorkflowContext.status, WorkflowExecutionContext.WorkflowStatus.ERROR_SHUTDOWN);
// new object is present
lastWorkflowContext = findSingleLastWorkflow();
if (autoFailAndReplay) {
assertReplayedAfterRebindAndEventuallyThrowsDangling(lastWorkflowContext);
} else {
// status should have been persisted as not ended
Asserts.assertEquals(lastWorkflowContext.status.ended, false);
}
Asserts.assertThat(lastWorkflowContext.currentStepIndex, x -> x == null || x <= 3);
// sensor might be one or might be null
Asserts.assertThat(app.sensors().get(Sensors.newSensor(Integer.class, "x")), x -> x == null || x == 1);
// now we can tell it to resume from where it crashed
lastInvocation = Entities.submit(app, lastWorkflowContext.factory(false).createTaskReplaying(lastWorkflowContext.factory(false).makeInstructionsForReplayResuming("test", true)));
// will wait on gate, ie not finish
Time.sleep((long) (Math.random() * Math.random() * 200));
Asserts.assertFalse(lastInvocation.isDone());
// workflow should now complete when gate is set
app.sensors().set(Sensors.newBooleanSensor("gate"), true);
Asserts.assertEquals(lastInvocation.get(), 11);
EntityAsserts.assertAttributeEquals(app, Sensors.newSensor(Object.class, "x"), 11);
return null;
});
}
private WorkflowExecutionContext findSingleLastWorkflow() {
Map<String, WorkflowExecutionContext> workflow = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app);
Asserts.assertSize(workflow, 1);
lastWorkflowContext = workflow.values().iterator().next();
return lastWorkflowContext;
}
private WorkflowExecutionContext findLastWorkflow(String workflowId) {
lastWorkflowContext = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app).get(workflowId);
return Asserts.assertNotNull(lastWorkflowContext, "Cannot find workflow for "+workflowId);
}
private void assertReplayedAfterRebindAndEventuallyThrowsDangling(WorkflowExecutionContext context) {
lastInvocation = mgmt().getExecutionManager().getTask(context.getTaskId());
Asserts.assertNotNull(lastInvocation);
lastInvocation.blockUntilEnded();
// should get the dangling error
Asserts.assertEquals(context.status, WorkflowExecutionContext.WorkflowStatus.ERROR);
try {
lastInvocation.get();
Asserts.shouldHaveFailedPreviously("Expected to throw "+DanglingWorkflowException.class);
} catch (Exception ex) {
// expected!
if (Exceptions.getFirstThrowableOfType(ex, DanglingWorkflowException.class)==null) {
throw new AssertionError("Wrong exception: "+ex, ex);
}
}
}
@Test
public void testShutdownNotedIfManagementStopped() throws IOException {
runIncrementingXAwaitingGate();
Entities.destroyAll(mgmt());
lastInvocation.blockUntilEnded(Duration.seconds(5));
Asserts.assertTrue(lastInvocation.isError());
// error is set, although it will not usually be persisted
Asserts.eventually(() -> lastWorkflowContext.status, status -> status.error);
if (lastWorkflowContext.status == WorkflowExecutionContext.WorkflowStatus.ERROR_SHUTDOWN || lastWorkflowContext.status == WorkflowExecutionContext.WorkflowStatus.ERROR_ENTITY_DESTROYED) {
// as expected
} else if (lastWorkflowContext.status == WorkflowExecutionContext.WorkflowStatus.ERROR) {
// sometimes happens; to be investigated
log.warn("Workflow ended with error, not error shutdown; value:\n"+lastInvocation.getStatusDetail(true));
} else {
log.error("Workflow ended with wrong error status: "+lastWorkflowContext.status);
Asserts.fail("Workflow ended with wrong error status: "+lastWorkflowContext.status+ " / value:\n"+lastInvocation.getStatusDetail(true));
}
}
@Test(groups="Integration", invocationCount = 10) // because a bit slow and non-deterministic
public void testNestedEffectorShutdownAndReplayedAutomatically() throws Exception {
doTestNestedWorkflowShutdownAndReplayed(true, "invoke-effector incrementXWithGate", app->{
new WorkflowEffector(ConfigBag.newInstance()
.configure(WorkflowEffector.EFFECTOR_NAME, "incrementXWithGate")
.configure(WorkflowEffector.STEPS, INCREMENTING_X_STEPS)
).apply((EntityLocal)app);
});
}
@Test(groups="Integration", invocationCount = 10) // because a bit slow and non-deterministic
public void testNestedWorkflowShutdownAndReplayedAutomatically() throws Exception {
doTestNestedWorkflowShutdownAndReplayed(true, MutableMap.of("type", "workflow", "steps", INCREMENTING_X_STEPS), null);
}
@Test(groups="Integration", invocationCount = 10) // because a bit slow and non-deterministic
public void testNestedEffectorShutdownAndReplayedManually() throws Exception {
doTestNestedWorkflowShutdownAndReplayed(false, "invoke-effector incrementXWithGate", app->{
new WorkflowEffector(ConfigBag.newInstance()
.configure(WorkflowEffector.EFFECTOR_NAME, "incrementXWithGate")
.configure(WorkflowEffector.STEPS, MutableList.<Object>of("log running nested incrementX in ${workflow.name}").appendAll(INCREMENTING_X_STEPS))
).apply((EntityLocal)app);
});
}
@Test(groups="Integration", invocationCount = 10) // because a bit slow and non-deterministic
public void testNestedWorkflowShutdownAndReplayedManually() throws Exception {
doTestNestedWorkflowShutdownAndReplayed(false, MutableMap.of("type", "workflow", "steps", INCREMENTING_X_STEPS), null);
}
void doTestNestedWorkflowShutdownAndReplayed(boolean autoFailAndReplay, Object call, Consumer<BasicApplication> initializer) throws Exception {
lastInvocation = runSteps(MutableList.<Object>of(
"let y = ${entity.sensor.y} ?? 0",
"let y = ${y} + 1",
"set-sensor y = ${y}",
call,
"let x = ${workflow.previous_step.output}",
"let y = ${y} + 10",
"set-sensor y = ${y}",
"let z = ${y} * 100 + ${x}",
"return ${z}"
), initializer);
// run once with no interruption, make sure fine
app.sensors().set(Sensors.newBooleanSensor("gate"), true);
Asserts.assertEquals(lastInvocation.get(), 1111);
app.sensors().set(Sensors.newBooleanSensor("gate"), false);
// now invoke again
lastInvocation = app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), null);
String workflowId = BrooklynTaskTags.getWorkflowTaskTag(lastInvocation, false).getWorkflowId();
// interrupt at any point, probably when gated
Time.sleep((long) (Math.random()*Math.random()*200));
ManagementContext oldMgmt = mgmt();
possiblyWithAutoFailAndReplay(autoFailAndReplay, () -> {
app = rebind();
((ManagementContextInternal) oldMgmt).terminate();
lastWorkflowContext = findLastWorkflow(lastInvocation.getId());
if (autoFailAndReplay) {
assertReplayedAfterRebindAndEventuallyThrowsDangling(lastWorkflowContext);
} else {
// status should have been persisted as not ended
Asserts.assertEquals(lastWorkflowContext.status.ended, false);
}
lastInvocation = Entities.submit(app, lastWorkflowContext.factory(false).createTaskReplaying(lastWorkflowContext.factory(false).makeInstructionsForReplayResuming("test", true)));
if (lastInvocation.blockUntilEnded(Duration.millis(20))) {
Asserts.fail("Invocation ended when it shouldn't have, with "+lastInvocation.get());
}
app.sensors().set(Sensors.newBooleanSensor("gate"), true);
Asserts.assertEquals(lastInvocation.get(), 2222);
// wait for effector to be invoked
EntityAsserts.assertAttributeEqualsEventually(app, Sensors.newSensor(Object.class, "x"), 22);
EntityAsserts.assertAttributeEqualsEventually(app, Sensors.newSensor(Object.class, "y"), 22);
return null;
});
}
public static List<Object> INCREMENTING_X_STEPS_SETTING_REPLAYABLE(boolean replayableOnBuggyStep1, boolean replayableOnGoodStep2) {
return MutableList.<Object>of(
MutableMap.of("s", "let integer x = ${entity.sensor.x} ?? 0", "replayable", replayableOnBuggyStep1 ? "from here" : null),
MutableMap.of("s", "let x = ${x} + 1", "replayable", replayableOnGoodStep2 ? "from here" : null),
"set-sensor x = ${x}",
"wait ${entity.attributeWhenReady.gate}",
"return ${x}").asUnmodifiable();
}
private void runIncrementingXSettingReplayable(boolean replayableOnBuggyStep1, boolean replayableOnGoodStep2) {
lastInvocation = runSteps(INCREMENTING_X_STEPS_SETTING_REPLAYABLE(replayableOnBuggyStep1, replayableOnGoodStep2), null);
// ensure workflow sensor set immediately (this is done synchronously when effector invocation task is created, to ensure it can be resumed)
findSingleLastWorkflow();
}
private void runIncrementingXAwaitingGateSettingReplayable(boolean replayableOnBuggyStep1, boolean replayableOnGoodStep2) {
runIncrementingXSettingReplayable(replayableOnBuggyStep1, replayableOnGoodStep2);
EntityAsserts.assertAttributeEqualsEventually(app, Sensors.newSensor(Object.class, "x"), 1);
// refresh this for good measure (it won't have changed but feels like bad practice to rely on that)
lastWorkflowContext = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app).values().iterator().next();
}
@Test(groups="Integration") //because slow
public void testWorkflowInterruptedAndCanReplaySettingReplayableBuggy() throws IOException {
doTestWorkflowInterruptedAndCanReplaySettingReplayable(true, false, false, true);
}
@Test(groups="Integration") //because slow
public void testWorkflowInterruptedAndCanReplaySettingReplayableSensibleReplayPoint() throws IOException {
doTestWorkflowInterruptedAndCanReplaySettingReplayable(false, true, false, false);
}
@Test(groups="Integration") //because slow
public void testWorkflowInterruptedAndCanReplaySettingReplayableSensibleResuming() throws IOException {
doTestWorkflowInterruptedAndCanReplaySettingReplayable(false, false, true, false);
}
@Test(groups="Integration") //because slow
public void testWorkflowInterruptedAndCanReplaySettingReplayableSensibleResumingEvenIfBuggyReplayPoint() throws IOException {
doTestWorkflowInterruptedAndCanReplaySettingReplayable(true, false, true, false);
}
void doTestWorkflowInterruptedAndCanReplaySettingReplayable(boolean replayableOnBuggyStep1, boolean replayableOnGoodStep2, boolean isResuming, boolean expectBuggy) throws IOException {
runIncrementingXAwaitingGateSettingReplayable(replayableOnBuggyStep1, replayableOnGoodStep2);
// cancel the workflow
lastWorkflowContext.getTask(true).get().cancel(true);
lastInvocation.blockUntilEnded(Duration.seconds(2));
// in buggy mode, should want to continue from 0
int expected = -1;
if (expectBuggy) {
Asserts.assertThat(lastWorkflowContext.replayableLastStep, step -> step == 0);
expected = 2;
} else {
if (!isResuming) {
Asserts.assertThat(lastWorkflowContext.replayableLastStep, step -> step == 1);
}
expected = 1;
}
// replay
Task<Object> invocation2 = DynamicTasks.submit(lastWorkflowContext.factory(false).createTaskReplaying(
isResuming
? lastWorkflowContext.factory(false).makeInstructionsForReplayResuming("test", false)
: lastWorkflowContext.factory(false).makeInstructionsForReplayingFromLastReplayable("test", false)
), app);
// should get 2 because it replays from 0
app.sensors().set(Sensors.newBooleanSensor("gate"), true);
Asserts.assertEquals(invocation2.getUnchecked(), expected);
EntityAsserts.assertAttributeEquals(app, Sensors.newSensor(Object.class, "x"), expected);
}
@Test(groups="Integration") //because slow
public void testNestedWorkflowInterruptedAndCanReplaySettingReplayable_BuggyNestedReplay() throws IOException {
doTestNestedEffectorWorkflowInterruptedAndCanReplaySettingReplayable(true, true, true, false, false,
1, 0, true);
}
@Test(groups="Integration") //because slow
public void testNestedWorkflowInterruptedAndCanReplaySettingReplayable_BuggyTopLevelReplay() throws IOException {
doTestNestedEffectorWorkflowInterruptedAndCanReplaySettingReplayable(true, false,false, false, false,
-1, null, true);
}
@Test(groups="Integration") //because slow
public void testNestedWorkflowInterruptedAndCanReplaySettingReplayable_AllPointsAreBuggy() throws IOException {
doTestNestedEffectorWorkflowInterruptedAndCanReplaySettingReplayable(true, true, true, true, false,
1, 1, true);
}
@Test(groups="Integration") //because slow
public void testNestedWorkflowInterruptedAndCanReplaySettingReplayable_SensibleResuming() throws IOException {
doTestNestedEffectorWorkflowInterruptedAndCanReplaySettingReplayable(true, true, true, false, true,
1, 0, false);
}
void doTestNestedEffectorWorkflowInterruptedAndCanReplaySettingReplayable(boolean replayableFromStartOuter, boolean replayableOnEffector,
boolean replayableOnBuggyStep1, boolean replayableOnGoodStep2, boolean isResuming,
Integer expectedOuterStep, Integer expectedNestedStep, boolean expectBuggy) throws IOException {
// replay is only "good" if we set a replay point _after_ the sensor was read;
// otherwise it increments x twice
lastInvocation = runSteps(
MutableList.of(
"sleep 20ms",
MutableMap.of("s", "invoke-effector nestedWorkflow", "replayable", replayableOnEffector ? "from here" : null)),
app -> {
List<Object> steps = INCREMENTING_X_STEPS_SETTING_REPLAYABLE(replayableOnBuggyStep1, replayableOnGoodStep2);
WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance()
.configure(WorkflowEffector.EFFECTOR_NAME, "nestedWorkflow")
.configure(WorkflowEffector.STEPS, steps)
);
eff.apply((EntityLocal)app);
},
ConfigBag.newInstance().configure(WorkflowEffector.REPLAYABLE, replayableFromStartOuter ? "from start" : null));
// ensure workflow sensor set immediately (this is done synchronously when effector invocation task is created, to ensure it can be resumed)
findSingleLastWorkflow();
EntityAsserts.assertAttributeEqualsEventually(app, Sensors.newSensor(Object.class, "x"), 1);
Time.sleep(Duration.millis(50)); // give it 50ms to make sure it gets to the waiting step
// refresh this for good measure (it won't have changed but feels like bad practice to rely on that)
lastWorkflowContext = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app).values().iterator().next();
// cancel the workflow
lastWorkflowContext.getTask(true).get().cancel(true);
lastInvocation.blockUntilEnded(Duration.seconds(2));
// in buggy mode, should want to continue from 0
int expected = -1;
Asserts.assertThat(lastWorkflowContext.replayableLastStep, step -> Objects.equals(step, expectedOuterStep));
Map<String, WorkflowExecutionContext> workflows = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app);
Asserts.assertSize(workflows, 2);
WorkflowExecutionContext nestedWorkflow = workflows.values().stream().filter(w -> w.getParentTag() != null).findAny().get();
Asserts.assertThat(nestedWorkflow.replayableLastStep, step -> Objects.equals(step, expectedNestedStep));
if (expectBuggy) {
expected = 2;
} else {
expected = 1;
}
// replay
Task<Object> invocation2 = DynamicTasks.submit(lastWorkflowContext.factory(false).createTaskReplaying(
isResuming
? lastWorkflowContext.factory(false).makeInstructionsForReplayResuming("test", false)
: lastWorkflowContext.factory(false).makeInstructionsForReplayingFromLastReplayable("test", false)
), app);
// should get 2 because it replays from 0
app.sensors().set(Sensors.newBooleanSensor("gate"), true);
Asserts.assertEquals(invocation2.getUnchecked(), expected);
EntityAsserts.assertAttributeEquals(app, Sensors.newSensor(Object.class, "x"), expected);
}
@Test
public void testSimpleErrorHandlerOnStep() throws IOException {
try (ClassLogWatcher logWatcher = new ClassLogWatcher(getClass().getPackage().getName())) {
lastInvocation = runSteps(MutableList.of(
MutableMap.of("s", "invoke-effector does-not-exist",
"output", "should have failed",
"on-error", MutableList.of(
MutableMap.of("type", "no-op",
"output", "error-handler worked!")))),
null);
Asserts.assertEquals(lastInvocation.getUnchecked(), "error-handler worked!");
List<String> msgs = logWatcher.getMessages().stream().filter(x -> !x.startsWith("Blocked by lock")).collect(Collectors.toList());
// can have "Blocked by lock on lock-for-incrementor, currently held by JPuhvC9I" from a previous invocation?
log.info("Error handler output:\n"+msgs.stream().collect(Collectors.joining("\n")));
Asserts.assertEntriesSatisfy(msgs, MutableList.of(
m -> m.matches("Starting workflow 'myWorkflow .workflow effector.', moving to first step .*-1"),
m -> m.matches("Starting step .*-1 in task .*"),
m -> m.matches("Encountered error in step .*-1 '1 - invoke-effector does-not-exist' .handler present.: No effector matching 'does-not-exist'"),
m -> m.matches("Starting .*-1-error-handler with 1 step in task .*"),
m -> m.matches("Starting .*-1-error-handler-1 in task .*"),
m -> m.matches("Completed handler .*-1-error-handler; no next step indicated so proceeding to default next step"),
m -> m.matches("Completed step .*-1; no further steps: Workflow completed"),
m -> m.matches("Completed workflow .* successfully; step count: 1 considered, 1 executed") ));
lastWorkflowContext = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app).values().iterator().next();
Asserts.assertEquals(lastWorkflowContext.currentStepIndex, (Integer) 1);
}
}
@Test
public void testSimpleErrorHandlerOnWorkflow() throws IOException {
try (ClassLogWatcher logWatcher = new ClassLogWatcher(getClass().getPackage().getName())) {
lastInvocation = runSteps(MutableList.of(
MutableMap.of("s", "invoke-effector does-not-exist",
"output", "should have failed"),
"log should not run"),
null,
ConfigBag.newInstance().configure(
WorkflowEffector.ON_ERROR, MutableList.of(
MutableMap.of("type", "no-op",
"output", "error-handler worked!"))));
Asserts.assertEquals(lastInvocation.getUnchecked(), "error-handler worked!");
List<String> msgs = logWatcher.getMessages().stream().filter(x -> !x.startsWith("Blocked by lock")).collect(Collectors.toList());
// can have "Blocked by lock on lock-for-incrementor, currently held by JPuhvC9I" from a previous invocation?
log.info("Error handler output:\n"+msgs.stream().collect(Collectors.joining("\n")));
Asserts.assertEntriesSatisfy(msgs, MutableList.of(
m -> m.matches("Starting workflow 'myWorkflow .workflow effector.', moving to first step .*-1"),
m -> m.matches("Starting step .*-1 in task .*"),
m -> m.matches("myWorkflow .*: Error in step '1 - invoke-effector does-not-exist'; rethrowing: No effector matching 'does-not-exist'"),
m -> m.matches("Error in workflow 'myWorkflow .workflow effector.' around step .*-1, running error handler"),
m -> m.matches("Encountered error in workflow .*/.* 'myWorkflow' .handler present.: No effector matching 'does-not-exist'"),
m -> m.matches("Starting .*-error-handler with 1 step in task .*"),
m -> m.matches("Starting .*-error-handler-1 in task .*"),
m -> m.matches("Completed handler .*-error-handler; no next step indicated so proceeding to default next step"),
m -> m.matches("Handled error in workflow around step .*-1; inferred next step 'end': Workflow completed")));
}
}
@Test
public void testSimpleErrorHandlerOnWorkflowFailing() throws IOException {
lastInvocation = runSteps(MutableList.of("invoke-effector does-not-exist"),
null,
ConfigBag.newInstance().configure(
WorkflowEffector.ON_ERROR, MutableList.of("set-sensor had_error = yes", "fail rethrow message rethrown")) );
Asserts.assertFailsWith(() -> lastInvocation.getUnchecked(), e -> {
Asserts.expectedFailureContains(e, "rethrown");
Asserts.assertThat(Exceptions.getCausalChain(e), ee -> ee.stream().filter(e2 -> e2.toString().contains("does-not-exist")).findAny().isPresent());
return true;
});
Asserts.assertEquals(app.sensors().get(Sensors.newSensor(Object.class, "had_error")), "yes");
findSingleLastWorkflow();
Asserts.assertEquals(lastWorkflowContext.status, WorkflowExecutionContext.WorkflowStatus.ERROR);
}
@Test
public void testErrorHandlerListWithGotoExit() throws IOException {
try (ClassLogWatcher logWatcher = new ClassLogWatcher(getClass().getPackage().getName())) {
lastInvocation = runSteps(MutableList.of(
MutableMap.of("s", "invoke-effector does-not-exist",
"output", "NOT returned",
"on-error", MutableList.of(
"log Error handler 1-1",
MutableMap.of("step", "log Error handler 1-2",
"output", "from 1-2"),
MutableMap.of("step", "log NOT shown because of condition",
"condition", MutableMap.of("target", "${output}", "equals", "NOT matched")),
MutableMap.of("step", "log Error handler 1-4 has output ${output}",
"condition", MutableMap.of("target", "${output}", "glob", "from *")),
MutableMap.of("step", "log Step created-but-not-logged because of bad variable ${not_available}",
"on-error", MutableList.of(
MutableMap.of("step", "log Error handler 1-5-1", "output", "from 1-5-1"),
"goto exit",
"log NOT shown after inner exit")
),
"log Error handler 1-6",
"goto exit",
"log NOT shown because of earlier exit")
),
"log Step 2 has output ${output}"
),
null);
Asserts.assertEquals(lastInvocation.getUnchecked(), "from 1-5-1");
List<String> msgs = logWatcher.getMessages();
log.info("Error handler output:\n"+msgs.stream().collect(Collectors.joining("\n")));
Asserts.assertEquals(msgs.stream().filter(s -> s.contains("NOT")).findAny().orElse(null), null);
Asserts.assertEquals(msgs.stream().filter(s -> s.contains("created-but-not-logged") && !s.contains("Creating handler")).findAny().orElse(null), null);
Asserts.assertNotNull(msgs.stream().filter(s -> s.contains("1-1")).findAny().orElse(null));
Asserts.assertNotNull(msgs.stream().filter(s -> s.contains("1-2")).findAny().orElse(null));
Asserts.assertNotNull(msgs.stream().filter(s -> s.contains("1-4")).findAny().orElse(null));
Asserts.assertNotNull(msgs.stream().filter(s -> s.contains("1-5-1")).findAny().orElse(null));
Asserts.assertNotNull(msgs.stream().filter(s -> s.contains("1-6")).findAny().orElse(null));
Asserts.assertNotNull(msgs.stream().filter(s -> s.contains("Step 2 has output from 1-5-1")).findAny().orElse(null));
}
}
@Test
public void testErrorHandlerRethrows() throws IOException {
lastInvocation = runSteps(MutableList.of(
MutableMap.of("step", "fail message expected exception",
"output", "should have failed",
"on-error", MutableList.of(
MutableMap.of("step", "return not applicable",
"condition", "not matched")))),
null);
Asserts.assertFailsWith(() -> Asserts.fail("Did not fail, returned: "+lastInvocation.getUnchecked()),
e -> Asserts.expectedFailureContainsIgnoreCase(e, "expected exception"));
lastWorkflowContext = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app).values().iterator().next();
Asserts.assertEquals(lastWorkflowContext.currentStepIndex, (Integer) 0);
}
@Test
public void testMultilineErrorRegex() throws IOException {
lastInvocation = runSteps(MutableList.of(
MutableMap.of("step", "log ${var_does_not_exist}",
"output", "should have failed",
"on-error", MutableList.of(
MutableMap.of("step", "return error handled",
"condition", MutableMap.of("regex", ".*InvalidReference.*var_does_not_exist.*"))))),
null);
Asserts.assertEquals(lastInvocation.getUnchecked(), "error handled");
}
@Test
public void testTimeoutOnStep() throws Exception {
doTestTimeout(false, true);
}
@Test
public void testTimeoutOnWorkflow() throws Exception {
doTestTimeout(false, false);
}
@Test
public void testTimeoutNotExceededOnStep() throws Exception {
doTestTimeout(true, true);
}
@Test
public void testTimeoutNotExceededOnWorkflow() throws Exception {
doTestTimeout(true, false);
}
public void doTestTimeout(boolean finishesBeforeTimeout, boolean onStep) throws Exception {
Stopwatch sw = Stopwatch.createStarted();
Duration sleepTime, timeout;
ConfigBag effConfig = ConfigBag.newInstance();
if (finishesBeforeTimeout) {
sleepTime = Duration.seconds(1);
timeout = Duration.seconds(10);
} else {
sleepTime = Duration.seconds(10);
timeout = Duration.seconds(1);
}
Map<String,Object> step = MutableMap.of("s", "sleep "+sleepTime);
if (!onStep) effConfig.configure(WorkflowEffector.TIMEOUT, timeout);
else step.put("timeout", ""+timeout);
try {
lastInvocation = runSteps(MutableList.of(step), null, effConfig);
Object result = lastInvocation.getUnchecked();
if (finishesBeforeTimeout) {
// expected
} else {
throw Asserts.fail("Should have timed out, instead gave: " + result);
}
} catch (Exception e) {
if (!finishesBeforeTimeout && Exceptions.getFirstThrowableOfType(e, onStep ? (Class)TimeoutException.class : CancellationException.class)!=null) {
// expected; for step, the workflow should always capture it as a timeout exception;
// for workflow-level timeouts, per comments in WorkflowExecutionContext where it throws TimeoutException,
// we cannot easily prevent a CancellationException from being reported
} else {
throw Asserts.fail(e);
}
}
if (finishesBeforeTimeout) {
if (Duration.of(sw).isShorterThan(sleepTime))
Asserts.fail("Finished too quick: " + Duration.of(sw));
if (Duration.of(sw).isLongerThan(timeout)) Asserts.fail("Took too long: " + Duration.of(sw));
} else {
if (Duration.of(sw).isShorterThan(timeout)) Asserts.fail("Finished too quick: " + Duration.of(sw));
if (Duration.of(sw).isLongerThan(sleepTime)) Asserts.fail("Took too long: " + Duration.of(sw));
}
//app.getManagementContext().getExecutionManager().getTasksWithAllTags(
Asserts.eventually(() -> ((EntityInternal)app).getExecutionContext().getTasks().stream().filter(t -> !t.isDone()).collect(Collectors.toList()),
unfinishedTasks -> {
System.out.println("TASKS: "+unfinishedTasks);
System.out.println(lastInvocation);
return unfinishedTasks.isEmpty();
}, Duration.FIVE_SECONDS
);
}
@Test
public void testFailAndErrorHandlerAsListOrMapOrString() {
MutableList<MutableMap<String, Serializable>> errorHandler = MutableList.of(MutableMap.of("step", "return Yay WTF",
"condition", MutableMap.of("error-cause", MutableMap.of("regex", ".*Fail.*wtf.*"))));
doTestFail(errorHandler);
doTestFail(errorHandler.get(0));
doTestFail(errorHandler.get(0).get("step"));
}
void doTestFail(Object errorHandler) {
Task<?> out = runSteps(
MutableList.of("fail message wtf"),
null,
ConfigBag.newInstance().configure(WorkflowCommonConfig.ON_ERROR, errorHandler)
);
Asserts.assertEquals(out.getUnchecked(), "Yay WTF");
}
@Test
public void testReplayableDisabled() {
Task<?> out = runSteps(
MutableList.of("workflow replayable from here", "let x = ${entity.sensor.count} + 1 ?? 1", "set-sensor count = ${x}", "fail message wtf"),
null,
ConfigBag.newInstance().configure(WorkflowCommonConfig.REPLAYABLE, "disabled"));
out.blockUntilEnded();
EntityAsserts.assertAttributeEquals(app, Sensors.newIntegerSensor("count"), 1);
lastWorkflowContext = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app).values().iterator().next();
WorkflowExecutionContext.Factory wff = lastWorkflowContext.factory(true);
DynamicTasks.submit(wff.createTaskReplaying(wff.makeInstructionsForReplayingFromLastReplayable("testing", false)), app).blockUntilEnded();
EntityAsserts.assertAttributeEquals(app, Sensors.newIntegerSensor("count"), 2);
WorkflowExecutionContext.Factory wfd = lastWorkflowContext.factory(false);
Asserts.assertFailsWith(() -> wfd.createTaskReplaying(wfd.makeInstructionsForReplayingFromLastReplayable("testing", false)),
e -> {
Asserts.expectedFailureContainsIgnoreCase(e, "disabled");
return true;
});
EntityAsserts.assertAttributeEquals(app, Sensors.newIntegerSensor("count"), 2);
DynamicTasks.submit(wfd.createTaskReplaying(wfd.makeInstructionsForReplayingFromLastReplayable("testing", true)), app).blockUntilEnded();
EntityAsserts.assertAttributeEquals(app, Sensors.newIntegerSensor("count"), 3);
}
@Test
public void testRetentionQuickly() throws Exception {
app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
// always use the latest mgmt context!
Supplier<WorkflowStatePersistenceViaSensors> wp = () -> new WorkflowStatePersistenceViaSensors(mgmt());
BrooklynTaskTags.WorkflowTaskTag w1, w2, w3, w4;
w1 = doTestRetentionDisabled("context", "min(1,2) hash my-fixed-hash", false, false, false);
Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().expiryResolved, "min(1,2)");
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId()));
w1 = doTestRetentionDisabled(2, "hash my-fixed-hash min(1,context)", false, false, false);
Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().expiryResolved, "min(1,2)");
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId()));
// invoking our test gives a new workflow hash because the effector name is different
w2 = doTestRetentionDisabled(2, "1", false, false, false);
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
// reinvoking that effector still gives 2
Task<?> t = app.invoke(app.getEntityType().getEffectorByName("myWorkflow" + effNameCount).get(), null);
t.blockUntilEnded();
w2 = BrooklynTaskTags.getWorkflowTaskTag(t, false);
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
// hash accepts variables
app.config().set(ConfigKeys.newStringConfigKey("hash"), "my-fixed-hash");
// this hash replaces old w1
w1 = doTestRetentionDisabled("context", "min(1,2) hash ${entity.config.hash}", false, false, false);
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); // should replace the one above
// workflow.id as the hash variable means each invocations has its own retention
w3 = doTestRetentionDisabled("context", "1 hash ${workflow.id}", false, false, false);
t = app.invoke(app.getEntityType().getEffectorByName("myWorkflow" + effNameCount).get(), null);
t.blockUntilEnded();
w4 = BrooklynTaskTags.getWorkflowTaskTag(t, false);
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId(), w4.getWorkflowId())); // should replace the one above
}
@Test(groups="Integration") // very slow
public void testRetentionManyWaysIncludingDisabled() throws Exception {
app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
// always use the latest mgmt context!
Supplier<WorkflowStatePersistenceViaSensors> wp = () -> new WorkflowStatePersistenceViaSensors(mgmt());
BrooklynTaskTags.WorkflowTaskTag w1, w2, w3;
doTestRetentionDisabled("disabled", "ignored", true, false, true);
doTestRetentionDisabled("1", "disabled", true, false, false);
doTestRetentionDisabled("1", "disabled", false, true, true);
w1 = doTestRetentionDisabled("1", "min(1,5s)", true, false, false);
// only w1 should be persisted
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId()));
// run something else within 5s, should now be persisting 2
w2 = doTestRetentionDisabled("1", "min(1,5s)", true, false, false);
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
// wait 5s and run something, it should cause everything else to expire
Time.sleep(Duration.FIVE_SECONDS);
wp.get().expireOldWorkflows(app, null);
// should now be empty
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of());
String longWait = "10s";
w1 = doTestRetentionDisabled("1", "hash my-fixed-hash max(context,"+longWait+")", false, true, false);
Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().expiryResolved, "max(1,"+Duration.of(longWait)+")");
w2 = doTestRetentionDisabled("disabled", "max(1,"+longWait+") hash my-fixed-hash", false, true, false);
Time.sleep(Duration.seconds(5));
w3 = doTestRetentionDisabled("hash my-fixed-hash max(1,"+longWait+")", "context", false, true, false);
// should now have all 3
wp.get().expireOldWorkflows(app, null);
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId()));
Time.sleep(Duration.seconds(5));
// now just the last 1 (only 1 in 10s)
wp.get().expireOldWorkflows(app, null);
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w3.getWorkflowId()));
Time.sleep(Duration.seconds(5));
// still have last 1 (even after 10s)
wp.get().expireOldWorkflows(app, null);
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w3.getWorkflowId()));
// run two more, that's all we should have
w1 = doTestRetentionDisabled("1", "hash my-fixed-hash", false, true, false);
w2 = doTestRetentionDisabled("1", "context", false, true, false);
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
}
int effNameCount = 0;
BrooklynTaskTags.WorkflowTaskTag doTestRetentionDisabled(Object retentionOnWorkflow, String retentionOnStep, boolean rebindBeforeRetentionChangedOnStep, boolean rebindAfterRetentionChangedOnStep, boolean shouldBeDisabled) throws Exception {
effNameCount++;
AttributeSensor<Integer> a = Sensors.newIntegerSensor("a");
AttributeSensor<Integer> b = Sensors.newIntegerSensor("b");
((EntityInternal.SensorSupportInternal)app.sensors()).remove(a);
((EntityInternal.SensorSupportInternal)app.sensors()).remove(b);
Task<?> out = runStepsOnExistingApp("myWorkflow"+effNameCount,
MutableList.of(
"log wait for a in ${workflow.id} ${workflow.name}",
"wait ${entity.attributeWhenReady.a}",
"log got a",
"let result = ${entity.sensor.a}",
"workflow retention "+retentionOnStep,
"log retention step",
"wait ${entity.attributeWhenReady.b}",
"let result = ${result} + ${entity.sensor.b}",
"return ${result}"),
null,
ConfigBag.newInstance().configure((ConfigKey)WorkflowCommonConfig.RETENTION, retentionOnWorkflow));
BrooklynTaskTags.WorkflowTaskTag wt = BrooklynTaskTags.getWorkflowTaskTag(out, false);
if (rebindBeforeRetentionChangedOnStep) {
Time.sleep(Duration.millis(500));
app = rebind();
switchOriginalToNewManagementContext();
}
app.sensors().set(a, 10);
if (rebindAfterRetentionChangedOnStep) {
Time.sleep(Duration.millis(500));
app = rebind();
switchOriginalToNewManagementContext();
}
app.sensors().set(b, 1);
Maybe<WorkflowExecutionContext> wf = new WorkflowStatePersistenceViaSensors(mgmt()).getFromTag(wt);
if (shouldBeDisabled) {
Time.sleep(Duration.millis(500));
if (wf.isPresent()) Asserts.fail("Workflow persistence should be disabled, instead found: "+wf.get());
} else {
if (wf.isAbsent()) Asserts.fail("Workflow persistence should be enabled, instead did not find it");
lastWorkflowContext = wf.get();
lastInvocation = lastWorkflowContext.getTask(false).get();
if (rebindAfterRetentionChangedOnStep || rebindBeforeRetentionChangedOnStep) {
// was interrupted, make sure the dangling handler finishes then replay
lastInvocation.blockUntilEnded(Duration.FIVE_SECONDS);
lastInvocation = Entities.submit(app, lastWorkflowContext.factory(false).createTaskReplaying(lastWorkflowContext.factory(false).makeInstructionsForReplayResuming("test", true)));
}
Asserts.assertEquals(lastInvocation.getUnchecked(Duration.seconds(5)), 11);
}
return wt;
}
@Test
public void testParseErrorStatusAndRetention() throws Exception {
app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
Supplier<WorkflowStatePersistenceViaSensors> wp = () -> new WorkflowStatePersistenceViaSensors(mgmt());
// non-parseable step fails at add time, and is not persisted
try {
WorkflowEffector eff = addEffector("e1", MutableList.of(
"non-parseable-step"),
null,
ConfigBag.newInstance().configure((ConfigKey) WorkflowCommonConfig.RETENTION, 1));
Asserts.shouldHaveFailedPreviously("Instead had: "+eff);
} catch (Exception e) {
Asserts.expectedFailureContains(e, "non-parseable-step");
}
// nothing persisted
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of());
// with condition, it does not parse until actual call
addEffector("e1", MutableList.of(
"non-parseable-step"),
null,
ConfigBag.newInstance()
.configure((ConfigKey) WorkflowCommonConfig.CONDITION, "any: []")
.configure((ConfigKey) WorkflowCommonConfig.RETENTION, 1)
);
// but fails on invocation, before task created
try {
Task<?> task = app.invoke(app.getEntityType().getEffectorByName("e1").get(), null);
Asserts.shouldHaveFailedPreviously("Instead had: " + task.getStatusDetail(true));
} catch (Exception e) {
Asserts.expectedFailureContains(e, "non-parseable-step");
}
// nothing persisted
Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of());
// invalid replayable step also fails before persisted
try {
WorkflowBasicTest.runWorkflow(app,
Strings.lines(
"steps:",
"- workflow replayable unknown-replayable-mode"),
"e2");
} catch (Exception e) {
Asserts.expectedFailureContains(e, "unknown-replayable-mode");
}
Asserts.assertSize(wp.get().getWorkflows(app).keySet(), 0);
// (above is the result after fixing bug where some parses happened in the task, and again in the error handler,
// and didn't properly handle errors; now all parse errors should prevent workflow creation;
// there might be other edge cases where workflows remain in "staged" state, but they should be few if any,
// and worst case they can be manually deleted)
}
@Test
public void testErrorInSubWorkflowCaughtUpdatesContextAndStep() throws Exception {
app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
WorkflowExecutionContext run = WorkflowBasicTest.runWorkflow(app, Strings.lines(
"steps:",
"- log 1",
"- type: workflow",
" steps:",
" - log 2-1",
" - step: fail message 2-2",
" on-error:",
" - log 2-2-error",
" - fail message 2-2-done",
" - log 2-3",
" on-error: ",
" - return 2-done",
"- log 3"
), "test-error-in-subworkflow");
Asserts.assertEquals(run.getTask(true).get().getUnchecked(), "2-done");
WorkflowExecutionContext.OldStepRecord step2 = run.oldStepInfo.get(1);
Asserts.assertNotNull(step2);
Asserts.assertNotNull(step2.context);
Asserts.assertNull(step2.context.getError()); // should be null because handled
Asserts.assertNull(step2.context.errorHandlerTaskId); // should be null because not treated as a step handler, but handler for the workflow - step2sub.errorHandlerTaskId
BrooklynTaskTags.WorkflowTaskTag step2subTag = Iterables.getOnlyElement(step2.context.getSubWorkflows());
WorkflowExecutionContext step2sub = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app).get(step2subTag.getWorkflowId());
Asserts.assertEquals(step2sub.getStatus(), WorkflowExecutionContext.WorkflowStatus.SUCCESS);
Asserts.assertNotNull(step2sub.errorHandlerTaskId);
WorkflowExecutionContext.OldStepRecord step22 = step2sub.oldStepInfo.get(1);
Asserts.assertNotNull(step22);
Asserts.assertNotNull(step22);
Asserts.assertNotNull(step22.context);
Asserts.assertNotNull(step22.context.getError()); // not null because not handled here
Asserts.assertNotNull(step22.context.errorHandlerTaskId);
}
@Test
public void testErrorInSubWorkflowUncaughtUpdatesContextAndStep() throws Exception {
app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
WorkflowExecutionContext run = WorkflowBasicTest.runWorkflow(app, Strings.lines(
"steps:",
"- log 1",
"- type: workflow",
" steps:",
" - log 2-1",
" - step: fail message 2-2",
" on-error:",
" - log 2-2-error",
" - fail message 2-2-done",
" - log 2-3",
"- log 3"
), "test-error-in-subworkflow");
run.getTask(true).get().blockUntilEnded();
Asserts.assertEquals(run.getStatus(), WorkflowExecutionContext.WorkflowStatus.ERROR);
WorkflowExecutionContext.OldStepRecord step2 = run.oldStepInfo.get(1);
BrooklynTaskTags.WorkflowTaskTag step2subTag = Iterables.getOnlyElement(step2.context.getSubWorkflows());
WorkflowExecutionContext step2sub = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app).get(step2subTag.getWorkflowId());
Asserts.assertEquals(step2sub.getStatus(), WorkflowExecutionContext.WorkflowStatus.ERROR);
WorkflowExecutionContext.OldStepRecord step22 = step2sub.oldStepInfo.get(1);
Asserts.assertNotNull(step22);
Asserts.assertNotNull(step22.context);
Asserts.assertNotNull(step22.context.getError());
Asserts.assertNotNull(step22.context.errorHandlerTaskId);
}
}