blob: 03686deb82e09f64e09762c041c587a729a3d796 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.workflow;
import com.google.common.base.Stopwatch;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.core.entity.EntityAsserts;
import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixture;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.workflow.steps.flow.RetryWorkflowStep;
import org.apache.brooklyn.entity.stock.BasicApplication;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
public class WorkflowRetryTest extends RebindTestFixture<BasicApplication> {
private static final Logger log = LoggerFactory.getLogger(WorkflowRetryTest.class);
private BasicApplication app;
@Override
protected LocalManagementContext decorateOrigOrNewManagementContext(LocalManagementContext mgmt) {
WorkflowBasicTest.addWorkflowStepTypes(mgmt);
return super.decorateOrigOrNewManagementContext(mgmt);
}
@Override
protected BasicApplication createApp() {
return null;
}
Task<?> runStep(Object step, Consumer<BasicApplication> appFunction) {
return runSteps(MutableList.<Object>of(step), appFunction);
}
Task<?> runSteps(List<?> steps, Consumer<BasicApplication> appFunction) {
return runSteps(steps, appFunction, null);
}
Task<?> runSteps(List<?> steps, Consumer<BasicApplication> appFunction, ConfigBag initialEffectorConfig) {
BasicApplication app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
this.app = app;
WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance()
.configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflow")
.configure(WorkflowEffector.STEPS, (List) steps)
.copy(initialEffectorConfig)
);
if (appFunction!=null) appFunction.accept(app);
eff.apply((EntityLocal)app);
return app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), null);
}
private List<Map<String,Object>> basicSteps() {
return MutableList.of(
MutableMap.of(
"s", "let integer x = ${x} + 1 ?? 0",
"id", "one",
"replayable", "from here"),
MutableMap.of(
"s", "retry",
"limit", MutableList.of(5),
"condition",
MutableMap.of("target", "${x}",
"less-than", 3)
));
}
private List<Map<String,Object>> basicSteps(Consumer<List<Map<String,Object>>> tweaks) {
List<Map<String, Object>> steps = basicSteps();
tweaks.accept(steps);
return steps;
}
@Test
public void testRetryWithNext() {
Task<?> lastInvocation = runSteps(basicSteps(l -> l.get(1).put("next", "one")), null,
ConfigBag.newInstance().configure(WorkflowEffector.OUTPUT, "${x}"));
Asserts.assertEquals(lastInvocation.getUnchecked(), 3);
}
@Test
public void testRetryWithExplicitReplayReachesMax() {
// replay resets the workflow vars so it keeps setting x = 0
try {
Task<?> lastInvocation = runSteps(basicSteps(l -> l.get(1).putAll(MutableMap.of("replay", "true"))), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailureOfType(e, RetryWorkflowStep.RetriesExceeded.class);
Asserts.expectedFailureContainsIgnoreCase(e, "limit 5");
}
}
@Test
public void testRetryReplayByDefaultReachesMax() {
try {
Task<?> lastInvocation = runSteps(basicSteps(), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailureOfType(e, RetryWorkflowStep.RetriesExceeded.class);
Asserts.expectedFailureContainsIgnoreCase(e, "limit 5");
}
}
@Test
public void testRetryWithReplayExplicitNextReachesMax() {
try {
Task<?> lastInvocation = runSteps(basicSteps(l -> l.get(1).putAll(MutableMap.of("replay", "true", "next", "one"))), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailureOfType(e, RetryWorkflowStep.RetriesExceeded.class);
Asserts.expectedFailureContainsIgnoreCase(e, "limit 5");
}
}
private void makeNonReplayableNonIdempotent(Map x) {
x.remove("replayable");
x.put("idempotent", "no");
}
@Test
public void testNonreplayableRetryFails() {
try {
Task<?> lastInvocation = runSteps(basicSteps(l -> makeNonReplayableNonIdempotent(l.get(0))), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailureContainsIgnoreCase(e, "not replayable");
}
}
@Test
public void testRetryWithReplayExplicitNextForcedReachesMax() {
try {
Task<?> lastInvocation = runSteps(basicSteps(l -> {
makeNonReplayableNonIdempotent(l.get(0));
l.get(1).putAll(/* force retry replay from step 1 */ MutableMap.of("replay", "force", "next", "one"));
}), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailureOfType(e, RetryWorkflowStep.RetriesExceeded.class);
Asserts.expectedFailureContainsIgnoreCase(e, "limit 5");
}
}
@Test
public void testRetryWithReplayModeInvalidNiceError() {
try {
Task<?> lastInvocation = runSteps(basicSteps(l -> l.get(1).putAll(MutableMap.of("replay", "bogus", "next", "one"))), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailureContainsIgnoreCase(e, "invalid", "bogus", "expected one of", "true", "false", "force" );
}
}
@Test(groups="Integration") // because slow
public void testRetryWithExponentialBackoffPercentage() {
doTestRetryWithExponentialBackoff("300%");
}
@Test(groups="Integration") // because slow
public void testRetryWithExponentialBackoffTimes() {
doTestRetryWithExponentialBackoff("4x");
}
void doTestRetryWithExponentialBackoff(String value) {
Stopwatch sw = Stopwatch.createStarted();
try {
Task<?> lastInvocation = runSteps(basicSteps(l -> l.get(1).putAll(MutableMap.of("backoff", "0 0 100ms increasing "+value))), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailureOfType(e, RetryWorkflowStep.RetriesExceeded.class);
Asserts.expectedFailureContainsIgnoreCase(e, "limit 5");
}
long EXPECTED_DELAY = 0 + 0 + 100 + 400 + 1600;
Asserts.assertThat(Duration.of(sw), d -> d.isLongerThan(Duration.millis(EXPECTED_DELAY)));
long GRACE = 1500;
Asserts.assertThat(Duration.of(sw), d -> d.isShorterThan(Duration.millis(EXPECTED_DELAY + GRACE)));
}
@Test(groups="Integration") // because slow
public void testRetryWithLinearBackoff() {
Stopwatch sw = Stopwatch.createStarted();
try {
Task<?> lastInvocation = runSteps(basicSteps(l -> l.get(1).putAll(MutableMap.of("backoff", "0 0 100ms increasing 100ms"))), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailureOfType(e, RetryWorkflowStep.RetriesExceeded.class);
Asserts.expectedFailureContainsIgnoreCase(e, "limit 5");
}
long EXPECTED_DELAY = 0 + 0 + 100 + 200 + 300;
Asserts.assertThat(Duration.of(sw), d -> d.isLongerThan(Duration.millis(EXPECTED_DELAY)));
long GRACE = 1500;
Asserts.assertThat(Duration.of(sw), d -> d.isShorterThan(Duration.millis(EXPECTED_DELAY + GRACE)));
}
@Test(groups="Integration") // because slow
public void testRetryWithBackoffAndJitter() {
Stopwatch sw = Stopwatch.createStarted();
try {
Task<?> lastInvocation = runSteps(basicSteps(l -> l.get(1).putAll(MutableMap.of("backoff",
MutableMap.of("initial", "50ms", "jitter", 5)))), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailureOfType(e, RetryWorkflowStep.RetriesExceeded.class);
Asserts.expectedFailureContainsIgnoreCase(e, "limit 5");
}
long EXPECTED_DELAY = 50 * 5;
// jitter will be up to 5x the above, averaging 3x, unlikely to be less than 2x
Asserts.assertThat(Duration.of(sw), d -> d.isLongerThan(Duration.millis(EXPECTED_DELAY*2)));
long GRACE = 500;
Asserts.assertThat(Duration.of(sw), d -> d.isShorterThan(Duration.millis(EXPECTED_DELAY*10 + GRACE)));
}
@Test(groups="Integration") // because slow
public void testRetryWithLimitsInTime() {
Stopwatch sw = Stopwatch.createStarted();
try {
Task<?> lastInvocation = runSteps(basicSteps(l -> {
l.get(1).putAll(MutableMap.of("limit", "4 in 300ms", "backoff", "100ms 100ms 100ms 100ms 100ms 0"));
}), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailureOfType(e, RetryWorkflowStep.RetriesExceeded.class);
Asserts.expectedFailureContainsIgnoreCase(e, "retries total,", "limit 4 in 300ms");
}
long EXPECTED_DELAY = 100*5;
Asserts.assertThat(Duration.of(sw), d -> d.isLongerThan(Duration.millis(EXPECTED_DELAY)));
long GRACE = 1000;
Asserts.assertThat(Duration.of(sw), d -> d.isShorterThan(Duration.millis(EXPECTED_DELAY + GRACE)));
}
@Test(groups="Integration") // because slow
public void testRetryWithTimeout() {
Stopwatch sw = Stopwatch.createStarted();
try {
Task<?> lastInvocation = runSteps(basicSteps(l -> {
l.get(1).putAll(MutableMap.of("timeout", "300ms", "backoff", "100ms"));
}), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailure(e);
Asserts.assertNotNull(Exceptions.getFirstThrowableOfType(e, TimeoutException.class), "Exception "+e);
}
long EXPECTED_DELAY = 300;
Asserts.assertThat(Duration.of(sw), d -> d.isLongerThan(Duration.millis(EXPECTED_DELAY)));
long GRACE = 1000;
Asserts.assertThat(Duration.of(sw), d -> d.isShorterThan(Duration.millis(EXPECTED_DELAY + GRACE)));
}
@Test(groups="Integration") // because slow
public void testRetryWithBackoffUpToAndLimit() {
Stopwatch sw = Stopwatch.createStarted();
try {
Task<?> lastInvocation = runSteps(MutableList.of(
"let integer x = ${entity.sensor.x} ?? 0",
"let x = ${x} + 1",
"set-sensor x = ${x}",
"retry from start limit 1s backoff 1ms increasing 2x up to 32ms"), null);
Asserts.shouldHaveFailedPreviously("Instead got "+lastInvocation.getUnchecked());
} catch (Exception e) {
Asserts.expectedFailure(e);
Asserts.assertNotNull(Exceptions.getFirstThrowableOfType(e, RetryWorkflowStep.RetriesExceeded.class), "Exception "+e);
}
long EXPECTED_DELAY = 999;
Asserts.assertThat(Duration.of(sw), d -> d.isLongerThan(Duration.millis(EXPECTED_DELAY)));
Integer x = app.sensors().get(Sensors.newIntegerSensor("x"));
Asserts.assertThat(x, xx -> xx > 12); // shouldn't keep doubling beyond 32 ms
Asserts.assertThat(x, xx -> xx < 40); // but should increase
}
Task<?> lastInvocation;
@Test
public void testRetryInWorkflowOnError() {
doTestRetryOnError(true);
}
@Test
public void testRetryInStepOnError() {
doTestRetryOnError(false);
}
void doTestRetryOnError(boolean onWorkflow) {
// replay resets the workflow vars so it keeps setting x = 0
// will keep
Thread t = new Thread(() -> {
ConfigBag effectorConfig = ConfigBag.newInstance().configureStringKey("replayable", "from start");
Object failingStep = "let no_count = ${entity.sensor.no_count} + 1";
if (onWorkflow) effectorConfig.configureStringKey("on-error", MutableList.of("retry replay backoff 10ms"));
else failingStep = MutableMap.of("s", failingStep, "on-error", MutableList.of("retry replay backoff 10ms"));
lastInvocation = runSteps(MutableList.of(
"let count = ${entity.sensor.count} ?? 0",
"let count = ${count} + 1",
"set-sensor count = ${count}",
failingStep, // this will throw until we set the sensor
"set-sensor no_count = ${no_count}"),
null,
effectorConfig);
log.info("Invocation completed with: "+lastInvocation.getUnchecked());
});
lastInvocation = null;
t.start();
while (lastInvocation==null) Time.sleep(Duration.millis(10));
EntityAsserts.assertAttributeEventually(app, Sensors.newIntegerSensor("count"), v -> v!=null && v > 1);
Asserts.assertFalse(lastInvocation.isDone());
log.info("setting sensor so workflow can proceed");
app.sensors().set(Sensors.newIntegerSensor("no_count"), -1);
lastInvocation.getUnchecked(Duration.ONE_SECOND);
EntityAsserts.assertAttributeEquals(app, Sensors.newIntegerSensor("no_count"), 0);
// shouldn't have taken more than 5s
EntityAsserts.assertAttribute(app, Sensors.newIntegerSensor("count"), v -> v < 50);
}
}