blob: 4c2c667d592bd03695d84526d95a616aed6db4c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.core.workflow.store;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
import org.apache.brooklyn.core.workflow.utils.WorkflowRetentionParser;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.task.BasicExecutionManager;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
public class WorkflowRetentionAndExpiration {
private static final Logger log = LoggerFactory.getLogger(WorkflowRetentionAndExpiration.class);
public static final ConfigKey<String> WORKFLOW_RETENTION_DEFAULT = ConfigKeys.newStringConfigKey("workflow.retention.default",
"Default retention for workflows", "3");
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public static class WorkflowRetentionSettings {
public Boolean disabled;
public String hash;
public String expiry;
public String expiryResolved;
// soft expiry refers to what is kept in memory
public String softExpiry;
public String softExpiryResolved;
@JsonIgnore
private transient WorkflowRetentionParser.WorkflowRetentionFilter expiryFn;
public WorkflowRetentionParser.WorkflowRetentionFilter getExpiryFn(WorkflowExecutionContext w) {
return init(w).expiryFn;
}
public WorkflowRetentionSettings init(WorkflowExecutionContext w) {
if (w.getParent()!=null && Boolean.TRUE.equals(w.getParent().getRetentionSettings().disabled)) {
disabled = true;
} else if (expiryFn==null) {
expiryResolved = expiryResolved!=null ? expiryResolved : expiry;
expiryFn = new WorkflowRetentionParser(expiryResolved).parse();
if (w != null) {
Set<String> set = INIT_REENTRANT.get();
if (set==null) {
set = MutableSet.of();
INIT_REENTRANT.set(set);
}
if (!set.add(w.getWorkflowId()+":"+expiryResolved)) {
// double-check we don't cause endless loops; see KeepContext notes
throw new IllegalStateException("Invalid workflow retention '"+expiryResolved+"' as it refers to itself");
}
try {
expiryFn = expiryFn.init(w);
} finally {
set.remove(w.getWorkflowId()+":"+expiryResolved);
if (set.isEmpty()) INIT_REENTRANT.remove();
}
}
expiryResolved = expiryFn.toString(); // remove any references to `context` that might trigger an infinite loop
}
return this;
}
public void updateFrom(WorkflowRetentionSettings r2) {
if (Strings.isNonBlank(r2.hash)) this.hash = r2.hash;
this.disabled = Boolean.TRUE.equals(r2.disabled) ? true : null;
if (Strings.isNonEmpty(r2.expiry)) {
this.expiry = r2.expiry;
this.expiryFn = r2.expiryFn;
this.expiryResolved = r2.expiryResolved;
}
}
}
static ThreadLocal<Set<String>> INIT_REENTRANT = new ThreadLocal<Set<String>>();
static Map<String, WorkflowExecutionContext> recomputeExpiration(Map<String, WorkflowExecutionContext> v, WorkflowExecutionContext optionalContext, boolean useSoftlyKeptExpiry) {
Set<String> workflowHashesToUpdate = optionalContext!=null ? MutableSet.of(Strings.firstNonBlank(optionalContext.getRetentionHash(), "empty-expiry-hash")) //should always be set
: v.values().stream().map(WorkflowExecutionContext::getRetentionHash).collect(Collectors.toSet());
workflowHashesToUpdate.forEach(k -> {
List<WorkflowExecutionContext> finishedTwins = v.values().stream()
.filter(c -> k.equals(c.getRetentionHash()))
.filter(c -> isExpirable(c))
.filter(c -> !c.equals(optionalContext))
.collect(Collectors.toList());
if (finishedTwins.isEmpty()) return;
Optional<WorkflowExecutionContext> existingRetentionExpiry = finishedTwins.stream().filter(w -> w.getRetentionSettings().expiry != null).findAny();
WorkflowRetentionParser.WorkflowRetentionFilter expiry;
if (useSoftlyKeptExpiry) {
expiry = WorkflowRetentionParser.newDefaultSoftFilter().init(finishedTwins.iterator().next());
} else if (existingRetentionExpiry.isPresent()) {
// log if expiry fn differs for the same hash
// (but note if it refers to parents, invocations from different places could result in different expiry functions)
if (optionalContext!=null && optionalContext.getRetentionHash().equals(k)) {
if (optionalContext.getRetentionSettings().expiry != null) {
if (!optionalContext.getRetentionSettings().expiry.equals(existingRetentionExpiry.get().getRetentionSettings().expiry)) {
log.warn("Retention specification for " + optionalContext + " '" + optionalContext.getRetentionSettings().expiry + "' is different for same hash. " +
"Expiry should be constant within a hash but " + existingRetentionExpiry.get() + " has '" + existingRetentionExpiry.get().getRetentionSettings().expiry + "'");
}
}
}
expiry = existingRetentionExpiry.get().getRetentionSettings().getExpiryFn(existingRetentionExpiry.get());
} else {
expiry = WorkflowRetentionParser.newDefaultFilter().init(finishedTwins.iterator().next());
}
Collection<WorkflowExecutionContext> retainedFinishedTwins = expiry.apply(finishedTwins);
if (retainedFinishedTwins.size() < finishedTwins.size()) {
MutableSet<WorkflowExecutionContext> toRemove = MutableSet.copyOf(finishedTwins);
toRemove.removeAll(retainedFinishedTwins);
toRemove.forEach(w -> {
log.debug("Expiring old workflow " + w + " as there are "+retainedFinishedTwins.size()+" more recent ones also completed");
deleteWorkflowFromMap(v, w, true, false);
});
}
});
return v;
}
static boolean deleteWorkflowFromMap(Map<String, WorkflowExecutionContext> v, WorkflowExecutionContext w, boolean andAllReplayTasks, boolean andSoftlyKept) {
boolean removed = v.remove(w.getWorkflowId()) != null;
if (andSoftlyKept) removed = WorkflowStateActiveInMemory.get(w.getManagementContext()).deleteWorkflow(w) || removed;
if (andAllReplayTasks) {
BasicExecutionManager em = ((BasicExecutionManager) w.getManagementContext().getExecutionManager());
w.getReplays().forEach(wr -> {
Task<?> wrt = em.getTask(wr.getTaskId());
if (wrt != null) em.deleteTask(wrt, false, true);
});
}
return removed;
}
private static boolean isExpirable(WorkflowExecutionContext c) {
if (c.getStatus() == null || !c.getStatus().expirable) return false;
if (c.getParent()!=null) {
// XXX for size reasons, should skip this - fow now, don't expire children workflows unless parents are also expirable
if (!isExpirable(c.getParent())) return false;
// we could weaken this if we have lots of children workflows, but that is more work; left as an enhancement
}
return true;
}
static boolean isExpirationCheckNeeded(Entity entity) {
if (Tasks.isAncestor(Tasks.current(), t -> BrooklynTaskTags.getTagsFast(t).contains(BrooklynTaskTags.ENTITY_INITIALIZATION))) {
// skip expiry during initialization
return false;
}
if (Entities.isUnmanagingOrNoLongerManaged(entity)) {
// skip expiry during shutdown
return false;
}
return true;
}
public static void expireOldWorkflows(Entity entity) {
new WorkflowStatePersistenceViaSensors(((EntityInternal)entity).getManagementContext()).updateMaps(entity, true, true, null, null);
}
}