blob: 07a635336bbcf7bdfbf417c3a2b3488244ef277f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.workflow.steps.flow;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.reflect.TypeToken;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.workflow.*;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.QuotedStringTokenizer;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.apache.brooklyn.core.workflow.WorkflowExecutionContext.STEP_TARGET_NAME_FOR_END;
import static org.apache.brooklyn.core.workflow.WorkflowExecutionContext.STEP_TARGET_NAME_FOR_LAST;
public class RetryWorkflowStep extends WorkflowStepDefinition {
private static final Logger log = LoggerFactory.getLogger(RetryWorkflowStep.class);
public static final String SHORTHAND = "[ ?${replay} \"replay\" ] [ \" from \" ${next} ] [ \" limit \" ${limit...} ] [ \" backoff \" ${backoff...} ] [ \" timeout \" ${timeout} ]";
public static final ConfigKey<RetryReplayOption> REPLAY = ConfigKeys.newConfigKey(RetryReplayOption.class, "replay");
public static final ConfigKey<List<RetryLimit>> LIMIT = ConfigKeys.newConfigKey(new TypeToken<List<RetryLimit>>() {}, "limit");
public static final ConfigKey<RetryBackoff> BACKOFF = ConfigKeys.newConfigKey(RetryBackoff.class, "backoff");
// if multiple retry steps declare the same hash key, their counts will be combined; used if the same error might be handled in different ways
// note that the limits and backoff _instructions_ apply only at the step where they are defing, so they may need to be defined at each step
public static final ConfigKey<String> HASH = ConfigKeys.newStringConfigKey("hash");
public enum RetryReplayOption { TRUE, FALSE, FORCE }
public static class RetryLimit {
public Integer count;
public Duration duration;
public static RetryLimit fromInteger(Integer i) {
return fromString(""+i);
}
public static RetryLimit fromString(String s) {
RetryLimit result = new RetryLimit();
String[] parts = s.trim().split(" +");
if (parts.length>=3 && "in".equals(parts[1])) {
result.count = Integer.parseInt(parts[0]);
result.duration = Duration.of(Arrays.asList(parts).subList(2, parts.length).stream().collect(Collectors.joining(" ")));
} else {
Pair<Integer, Duration> parse = null;
Exception problem = null;
try {
parse = parseCountOrDuration(s);
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
problem = e;
}
if (parse==null) {
throw new IllegalStateException("Illegal expression for retry limit, should be '${count}`, '${duration}', or '${count} in ${duration}': " + s, problem);
}
result.count = parse.getLeft();
result.duration = parse.getRight();
}
return result;
}
/** returns count in LHS _or_ duration in RHS, _or_ null if not parseable */
public static Pair<Integer,Duration> parseCountOrDuration(String phrase) {
if (phrase==null) return null;
phrase = phrase.trim();
if (phrase.isEmpty()) return null;
if (Character.isLetter(phrase.charAt(phrase.length()-1))) return Pair.of(null, Duration.parse(phrase));
return Pair.of(Integer.parseInt(phrase), null);
}
public Maybe<String> isReached(List<Instant> retries) {
Instant now = Instant.now();
if (count==null) {
if (duration==null) return Maybe.absent("No limit");
Optional<Instant> oldest = retries.stream().min(Instant::compareTo);
if (oldest.isPresent() && duration.isShorterThan(Duration.between(oldest.get(), now))) {
return Maybe.of(
(retries.size()==1 ? "1 retry" : retries.size()+" retries") + " since "+Duration.between(oldest.get(), now)+" ago (limit "+this+")");
}
} else {
List<Instant> filtered = retries.stream().filter(r -> duration == null || duration.isLongerThan(Duration.between(r, now))).collect(Collectors.toList());
if (filtered.size() >= count) {
if (filtered.isEmpty()) return Maybe.of("Max count 0 reached");
return Maybe.of(
(filtered.size() < retries.size() ? retries.size() + " retries total, " + filtered.size() :
(retries.size() == 1 ? "1 retry" : retries.size() + " retries") + " total") +
" since " + (Duration.between(filtered.get(0), now)) + " ago (limit " + this + ")");
}
}
return Maybe.absent("Limit not reached");
}
@Override
public String toString() {
return count!=null && duration!=null ? count + " in "+ duration
: count!=null ? ""+count
: duration!=null ? ""+duration
: "RetryLimit<unset>";
}
}
public static class RetryBackoff {
List<Duration> initial;
Double factor;
Duration increase;
Double jitter;
Duration max;
public void setInitial(List<Duration> initial) {
this.initial = initial;
}
public void setInitial(String initial) {
this.initial = MutableList.of(Duration.of(initial));
}
/** accepts eg: "0 0 100ms increasing 100% up to 1m" */
public static RetryBackoff fromString(String s) {
Maybe<Map<String, Object>> resultM = new ShorthandProcessor("${initial...} [ \" increasing \" ${factor} ] [ \" up to \" ${max}] [ \" jitter \" ${jitter} ]").process(s);
if (resultM.isAbsent()) throw new IllegalArgumentException("Invalid shorthand expression for backoff: '"+s+"'", Maybe.Absent.getException(resultM));
RetryBackoff result = new RetryBackoff();
String initialS = TypeCoercions.coerce(resultM.get().get("initial"), String.class);
if (Strings.isBlank(initialS)) {
throw new IllegalArgumentException("initial duration required for backoff");
}
result.initial = QuotedStringTokenizer.builder().includeQuotes(true).includeDelimiters(false).expectQuotesDelimited(true).failOnOpenQuote(true).build(initialS).remainderAsList().stream()
.map(Duration::of).collect(Collectors.toList());
String factor = (String) resultM.get().get("factor");
if (factor!=null) {
factor = factor.trim();
if (factor.endsWith("x")) {
result.factor = TypeCoercions.coerce(Strings.removeFromEnd(factor, "x"), Double.class);
} else if (factor.endsWith("%")) {
result.factor = 1 + TypeCoercions.coerce(Strings.removeFromEnd(factor, "%"), Double.class) / 100;
} else {
result.increase = Duration.of(factor);
}
}
result.max = TypeCoercions.coerce(resultM.get().get("max"), Duration.class);
String j = (String) resultM.get().get("jitter");
if (j!=null) {
j = j.trim();
boolean percent = j.endsWith("%");
if (percent) j = Strings.removeFromEnd(j, "%").trim();
result.jitter = TypeCoercions.coerce(j, Double.class);
if (percent) result.jitter /= 100;
}
return result;
}
}
@Override
public void populateFromShorthand(String expression) {
populateFromShorthandTemplate(SHORTHAND, expression);
Object limit = input.remove(LIMIT.getName());
if (limit!=null) {
if (limit instanceof String) setInput(LIMIT, MutableList.of(RetryLimit.fromString((String) limit)));
else if (limit instanceof List) setInput(LIMIT, (List) limit);
else throw new IllegalStateException("Invalid value for limit: " + limit);
}
String next = TypeCoercions.coerce(input.remove("next"), String.class);
if (Strings.isNonBlank(next)) this.next = next;
Duration timeout = TypeCoercions.coerce(input.remove("timeout"), Duration.class);
if (timeout!=null) this.timeout = timeout;
if (Boolean.FALSE.equals(input.get(REPLAY.getName()))) input.remove(REPLAY.getName()); // remove replay=false
}
@Override
public void validateStep(WorkflowStepResolution workflowStepResolution) {
super.validateStep(workflowStepResolution);
TypeCoercions.coerce(input.get(REPLAY.getName()), REPLAY.getTypeToken());
TypeCoercions.coerce(input.get(LIMIT.getName()), LIMIT.getTypeToken());
TypeCoercions.coerce(input.get(BACKOFF.getName()), BACKOFF.getTypeToken());
}
/** The meaning of stock step 'timeout' field is overridden here, because as retry is quick it has no timeout.
* See {@link #getMaximumRetryTimeout()}. */
@JsonIgnore @Override
public Duration getTimeout() {
return null;
}
@JsonIgnore
public Duration getMaximumRetryTimeout() {
return super.getTimeout();
}
public static class RetriesExceeded extends RuntimeException {
public RetriesExceeded(String message) { super(message); }
public RetriesExceeded(String message, Throwable cause) { super(message, cause); }
public RetriesExceeded(Throwable cause) { super(cause); }
}
@Override
protected Object doTaskBody(WorkflowStepInstanceExecutionContext context) {
String hash = Strings.firstNonBlank(context.getInput(HASH), context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()));
List<Instant> retries = context.getWorkflowExectionContext().getRetryRecords().compute(hash, (k, v) -> v != null ? v : MutableList.of());
List<RetryLimit> limit = context.getInput(LIMIT);
if (limit!=null) {
limit.forEach(l -> {
Maybe<String> reachedMessage = l.isReached(retries);
if (reachedMessage.isPresent()) throw new RetriesExceeded(reachedMessage.get(), context.getError());
});
}
Duration t = getMaximumRetryTimeout();
if (t!=null) {
Instant oldest = retries.stream().min((i1, i2) -> i1.compareTo(i2)).orElse(null);
if (oldest != null) {
Duration sinceFirst = Duration.between(oldest, Instant.now());
if (sinceFirst.isLongerThan(t)) {
throw Exceptions.propagate(new TimeoutException("Workflow duration of "+sinceFirst+" exceeds timeout of "+t).initCause(context.getError()));
}
}
}
RetryBackoff backoff = context.getInput(BACKOFF);
if (backoff!=null) {
Duration delay;
int exponent = 0;
if (backoff.initial!=null && !backoff.initial.isEmpty()) {
if (backoff.initial.size() > retries.size()) {
delay = backoff.initial.get(retries.size());
} else {
delay = backoff.initial.get(backoff.initial.size()-1);
exponent = 1 + retries.size() - backoff.initial.size();
}
} else {
// shouldn't be possible
delay = Duration.ZERO;
}
if (backoff.factor!=null) while (exponent-- > 0) delay = delay.multiply(backoff.factor);
if (backoff.increase !=null) delay = delay.add(backoff.increase.multiply(exponent));
if (backoff.jitter!=null) delay = delay.multiply(1 + Math.random()*backoff.jitter);
if (backoff.max!=null && delay.isLongerThan(backoff.max)) {
delay = backoff.max;
// also apply a sigmoidal heuristic if jitter requested
if (backoff.jitter!=null) delay = delay.multiply(1 / (1+Math.random()*backoff.jitter));
}
if (delay.isPositive()) {
Duration ddelay = delay;
try {
Tasks.withBlockingDetails("Waiting " + delay + " before retry #" + (retries.size() + 1), () -> {
log.debug("Waiting " + ddelay + " before retry #" + (retries.size() + 1));
Time.sleep(ddelay);
return null;
});
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
}
retries.add(Instant.now());
context.getWorkflowExectionContext().getRetryRecords().put(hash, retries);
boolean inErrorHandler = !context.equals(context.getWorkflowExectionContext().getCurrentStepInstance());
RetryReplayOption replay = context.getInput(REPLAY);
String next = this.next;
if (replay==null) {
replay = next==null ? RetryReplayOption.TRUE : RetryReplayOption.FALSE;
if (next==null) next = inErrorHandler ? STEP_TARGET_NAME_FOR_END : STEP_TARGET_NAME_FOR_LAST;
} else if (next==null) {
next = STEP_TARGET_NAME_FOR_LAST;
}
if (replay!=RetryReplayOption.FALSE) {
context.next = null;
if (STEP_TARGET_NAME_FOR_END.equals(next)) {
if (!inErrorHandler) {
log.warn("Retry target `"+STEP_TARGET_NAME_FOR_END+"` is only permitted inside an error handler; using `"+STEP_TARGET_NAME_FOR_LAST+"` instead");
next = STEP_TARGET_NAME_FOR_LAST;
} else {
context.next = context.getWorkflowExectionContext().factory(true).makeInstructionsForReplayResuming(
"Retry replay from '" + next + "' per step " + context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()), replay == RetryReplayOption.FORCE);
}
}
if (context.next==null) {
if (STEP_TARGET_NAME_FOR_LAST.equals(next)) {
context.next = null;
int lastReplayStep = context.getWorkflowExectionContext().getReplayableLastStep() != null ? context.getWorkflowExectionContext().getReplayableLastStep() : WorkflowExecutionContext.STEP_INDEX_FOR_START;
if (!inErrorHandler) {
if (context.getStepIndex() == lastReplayStep) {
// can't replay from retry step
lastReplayStep = WorkflowReplayUtils.findNearestReplayPoint(context.getWorkflowExectionContext(), lastReplayStep, false);
}
}
context.next = context.getWorkflowExectionContext().factory(true).makeInstructionsForReplayingFromStep(lastReplayStep,
"Retry replay per step " + context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()), replay == RetryReplayOption.FORCE);
// could offer retry resuming but that is often not wanted; instead do that if `next` is `end`
} else {
context.next = context.getWorkflowExectionContext().factory(true).makeInstructionsForReplayingFromStep(context.getWorkflowExectionContext().getIndexOfStepId(next).get().getLeft(),
"Retry replay from '" + next + "' per step " + context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()), replay == RetryReplayOption.FORCE);
}
}
log.debug("Retrying with "+context.next);
} else {
if (next==null) {
throw new IllegalStateException("Cannot retry with replay disabled and no specified next");
} else {
// will go to next by id
context.next = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, next);
log.debug("Retrying from explicit next step '"+context.next+"'");
}
}
return context.getPreviousStepOutput();
}
@Override protected Boolean isDefaultIdempotent() { return true; }
}