blob: a5dbe86b0f43a1bdd400e0a13dfa44428481f463 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.workflow.utils;
import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
import org.apache.brooklyn.core.workflow.WorkflowExpressionResolution;
import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration;
import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration.WorkflowRetentionSettings;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import javax.annotation.Nullable;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public class WorkflowRetentionParser {
/*
tracks a number and/or duration to indicate how many and how long workflows should be retained in memory after completion;
can take:
* a number, to indicate how many instances of a workflow should be kept
* a duration, to indicate for how long workflows should be kept
* `forever`, to never expire
* `context`, to use the previous retention values (often used together with `max`)
* `parent`, to use the value of any parent workflow or else the system default
* `system`, to use the system default (from brooklyn.properties)
* `min(<value>, <value>, ...)` or `max(<value>, <value>, ...)` of any of the expressions on this line or above (but not `disabled` or `hash`)
* `disabled`, to prevent persistence of a workflow, causing less work for the system where workflows don't need to be stored; such workflows will not be replayable by an operator or recoverable on failover
the semantics of `min` and `max` are
* `min` means completed workflow instances must only be retained if they meet all the constraints implied by the `<value>` arguments, i.e. `min(2, 3, 1h, 2h)` means only the most recent two instances need to be kept and only if it has been less than an hour since they completed
* `max` means completed workflow instances must be retained if they meet any of the constraints implied by the `<value>` arguments, i.e. `max(2, 3, 1h, 2h)` means to keep the 3 most recent instances irrespective of when they run, and to keep all instances for up to two hours
also allows a `hash <value>` to be set at the start or the end
also allows `hard` at start or end, or `soft [limit]` at end
*/
public static WorkflowRetentionSettings parse(String retentionExpression, @Nullable WorkflowExecutionContext context) {
WorkflowRetentionSettings result = new WorkflowRetentionSettings();
if (Strings.isBlank(retentionExpression)) return result;
retentionExpression = retentionExpression.trim().toLowerCase();
do {
if (retentionExpression.startsWith("hash ")) {
if (result.hash != null)
throw new IllegalArgumentException("Cannot set multiple 'hash' in retention expression");
retentionExpression = Strings.removeFromStart(retentionExpression, "hash").trim();
result.hash = Strings.getFirstWord(retentionExpression);
retentionExpression = retentionExpression.substring(result.hash.length()).trim();
continue;
}
if (retentionExpression.startsWith("hard ")) {
if (result.softExpiry != null)
throw new IllegalArgumentException("Cannot set multiple 'hard' or 'soft' in retention expression");
retentionExpression = Strings.removeFromStart(retentionExpression, "hard").trim();
result.softExpiry = "0";
continue;
}
// TODO soft/hard keyword; take whichever occurs last
if (retentionExpression.contains(" hash ")) {
if (result.hash != null)
throw new IllegalArgumentException("Cannot set multiple 'hash' in retention expression");
int i = retentionExpression.indexOf(" hash ");
result.hash = Strings.removeFromStart(retentionExpression.substring(i).trim(), "hash").trim();
retentionExpression = retentionExpression.substring(0, i).trim();
continue;
}
break;
} while (false);
if (retentionExpression.equals("disabled")) {
result.disabled = true;
} else {
if (Strings.isNonBlank(result.hash) && context!=null) {
result.hash = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, result.hash, String.class);
}
if (Strings.isBlank(retentionExpression)) return result;
result.expiry = retentionExpression;
// catch parse errors now; fn won't be accessible without a workflow execution context however
new WorkflowRetentionParser(result.expiry).parse();
}
return result;
}
public interface WorkflowRetentionFilter extends Function<Collection<WorkflowExecutionContext>,Collection<WorkflowExecutionContext>> {
default WorkflowRetentionFilter init(WorkflowExecutionContext context) { return this; }
}
static class KeepAll implements WorkflowRetentionFilter {
@Override
public Collection<WorkflowExecutionContext> apply(Collection<WorkflowExecutionContext> workflowExecutionContexts) {
return workflowExecutionContexts;
}
@Override
public String toString() {
return "forever";
}
}
static class KeepMax implements WorkflowRetentionFilter {
private final List<WorkflowRetentionFilter> values;
KeepMax(List<WorkflowRetentionFilter> values) { this.values = values; }
@Override
public Collection<WorkflowExecutionContext> apply(Collection<WorkflowExecutionContext> workflowExecutionContexts) {
return values.stream().map(v -> v.apply(workflowExecutionContexts)).reduce(MutableSet.of(), (t1, t2) -> { t1.addAll(t2); return t1; });
}
@Override
public String toString() {
return "max("+values.stream().map(Object::toString).collect(Collectors.joining(","))+")";
}
@Override
public WorkflowRetentionFilter init(WorkflowExecutionContext context) {
values.forEach(v -> v.init(context));
return WorkflowRetentionFilter.super.init(context);
}
}
static class KeepMin implements WorkflowRetentionFilter {
private final List<WorkflowRetentionFilter> values;
KeepMin(List<WorkflowRetentionFilter> values) { this.values = values; }
@Override
public Collection<WorkflowExecutionContext> apply(Collection<WorkflowExecutionContext> workflowExecutionContexts) {
List<Collection<WorkflowExecutionContext>> workflowsToKeep = values.stream().map(v -> v.apply(workflowExecutionContexts)).collect(Collectors.toList());
Iterator<Collection<WorkflowExecutionContext>> wi = workflowsToKeep.iterator();
if (!wi.hasNext()) return workflowExecutionContexts;
Set<WorkflowExecutionContext> intersection = MutableSet.copyOf(wi.next());
while (wi.hasNext()) intersection.retainAll(wi.next());
return intersection;
}
@Override
public String toString() {
return "min("+values.stream().map(Object::toString).collect(Collectors.joining(","))+")";
}
@Override
public WorkflowRetentionFilter init(WorkflowExecutionContext context) {
values.forEach(v -> v.init(context));
return WorkflowRetentionFilter.super.init(context);
}
}
static class KeepCount implements WorkflowRetentionFilter {
private final int count;
KeepCount(int count) { this.count = count; }
@Override
public Collection<WorkflowExecutionContext> apply(Collection<WorkflowExecutionContext> workflowExecutionContexts) {
if (workflowExecutionContexts.size()>count) {
return workflowExecutionContexts.stream().sorted(Comparator.comparing(WorkflowExecutionContext::getLastStatusChangeTime).reversed()).limit(count).collect(Collectors.toList());
} else {
return workflowExecutionContexts;
}
}
@Override
public String toString() {
return ""+count;
}
}
static class KeepDuration implements WorkflowRetentionFilter {
private final Duration duration;
KeepDuration(Duration duration) { this.duration = duration; }
@Override
public Collection<WorkflowExecutionContext> apply(Collection<WorkflowExecutionContext> workflowExecutionContexts) {
Instant expiry = Instant.now().minus(duration.toMilliseconds()+1, ChronoUnit.MILLIS);
return workflowExecutionContexts.stream().filter(c -> c.getLastStatusChangeTime().isAfter(expiry)).collect(Collectors.toList());
}
@Override
public String toString() {
return ""+duration;
}
}
static abstract class KeepDelegate implements WorkflowRetentionFilter {
WorkflowRetentionFilter delegate;
@Override
public Collection<WorkflowExecutionContext> apply(Collection<WorkflowExecutionContext> workflowExecutionContexts) {
if (delegate==null) throw new IllegalStateException("Not initialized");
return delegate.apply(workflowExecutionContexts);
}
@Override
public final WorkflowRetentionFilter init(WorkflowExecutionContext workflow) {
if (delegate==null) delegate = findDelegate(workflow);
return this;
}
protected abstract WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow);
}
static class KeepSystem extends KeepDelegate {
@Override
public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow) {
if (workflow==null) throw new IllegalStateException("Retention 'system' cannot be used here");
return new WorkflowRetentionParser(workflow.getManagementContext().getConfig().getConfig(WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT)).parse().init(null);
}
@Override
public String toString() {
return "system";
}
}
public static WorkflowRetentionFilter newDefaultFilter() {
return new KeepParent();
}
public static WorkflowRetentionFilter newDefaultSoftFilter() {
return new KeepSystem();
}
static class KeepParent extends KeepDelegate {
@Override
public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow) {
if (workflow == null) throw new IllegalStateException("Retention 'parent' cannot be used here");
else if (workflow.getParent()!=null) {
return workflow.getParent().getRetentionSettings().getExpiryFn(workflow.getParent());
} else {
return new KeepSystem().init(workflow);
}
}
@Override
public String toString() {
return "parent";
}
}
static class KeepContext extends KeepDelegate {
@Override
public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow) {
if (workflow == null) throw new IllegalStateException("Retention 'context' cannot be used here");
// expands to string to something that doesn't reference context so that this does not infinitely recurse
return workflow.getRetentionSettings().getExpiryFn(workflow);
}
@Override
public String toString() {
return delegate==null ? "context" : delegate.toString();
}
}
String fullExpression;
String rest;
public WorkflowRetentionParser(String fullExpression) {
this.fullExpression = fullExpression;
}
public WorkflowRetentionFilter parse() {
if (Strings.isBlank(fullExpression)) return newDefaultFilter();
rest = Strings.trimStart(fullExpression.toLowerCase());
WorkflowRetentionFilter result = parseTerm();
if (!Strings.isBlank(rest)) return newDefaultFilter();
return result;
}
Maybe<WorkflowRetentionFilter> eatFn(String word, Function<List<WorkflowRetentionFilter>, WorkflowRetentionFilter> fn) {
if (eatNA(word)) {
List<WorkflowRetentionFilter> args = parseGroupedList(false);
return Maybe.of(fn.apply(args));
}
return Maybe.absent();
}
public <T> T notNull(T value, String message) {
if (value!=null) return value;
throw error(message);
}
public boolean eat(String word) {
return eat(word, false);
}
public boolean eat(String word, boolean requireNextNonAlpha) {
if (rest.startsWith(word)) {
rest = rest.substring(word.length());
if (requireNextNonAlpha && !rest.isEmpty() && Character.isJavaIdentifierPart(rest.charAt(0))) {
rest = word + rest;
return false;
}
rest = Strings.trimStart(rest);
return true;
}
return false;
}
public boolean eatNA(String word) {
return eat(word, true);
}
protected RuntimeException error(String prefix) {
throw new IllegalArgumentException(prefix + " at position "+(fullExpression.length() - rest.length()));
}
public List<WorkflowRetentionFilter> parseGroupedList(boolean consumedStart) {
if (!consumedStart) {
if (!eat("(")) throw new IllegalStateException("Expected '('");
}
List<WorkflowRetentionFilter> terms = MutableList.of();
while (true) {
WorkflowRetentionFilter expr = parseTerm();
if (expr==null) break;
terms.add(expr);
if (!eat(",")) break;
}
if (!eat(")")) throw new IllegalStateException("Expected ')'");
return terms;
}
public WorkflowRetentionFilter parseTerm() {
Maybe<? extends WorkflowRetentionFilter> term;
term = eatFn("min", KeepMin::new);
if (term.isPresent()) return term.get();
term = eatFn("max", KeepMax::new);
if (term.isPresent()) return term.get();
if (eatNA("all") || eatNA("forever")) return new KeepAll();
if (eatNA("system")) return new KeepSystem();
if (eatNA("parent")) return new KeepParent();
if (eatNA("context")) return new KeepContext();
int i = maxPositive(rest.indexOf(","), rest.indexOf(")"));
if (i==-1) i = rest.length();
String nextWord = rest.substring(0, i).trim();
rest = rest.substring(i);
if (nextWord.matches("[0-9]+")) {
return new KeepCount(Integer.parseInt(nextWord));
} else {
try {
return new KeepDuration(Duration.of(nextWord));
} catch (Exception e) {
throw error("Expected a valid retention term, instead had '"+nextWord+"'");
}
}
}
private int maxPositive(int i1, int i2) {
if (i1<0) return i2;
if (i2<0) return i1;
return Math.min(i1, i2);
}
}