allow soft retention to be specified, globally or per workflow/step
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 8cffdcb..f026023 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -633,17 +633,12 @@
return ((EntityInternal)getEntity()).getManagementContext();
}
- @JsonIgnore
- protected WorkflowStatePersistenceViaSensors getPersister() {
- return new WorkflowStatePersistenceViaSensors(getManagementContext());
- }
-
public void persist() {
if (isInErrorHandlerSubWorkflow()) {
// currently don't persist error handler sub-workflows
return;
}
- getPersister().checkpoint(this);
+ WorkflowRetentionAndExpiration.checkpoint(getManagementContext(), this);
}
/** Get the value of the input. Supports Brooklyn DSL resolution but NOT Freemarker resolution. */
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
index 4c2c667..852c75d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
@@ -18,9 +18,20 @@
*/
package org.apache.brooklyn.core.workflow.store;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
@@ -28,6 +39,7 @@
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
+import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors.PersistenceWithQueuedTasks;
import org.apache.brooklyn.core.workflow.utils.WorkflowRetentionParser;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.task.BasicExecutionManager;
@@ -36,15 +48,51 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.stream.Collectors;
-
public class WorkflowRetentionAndExpiration {
private static final Logger log = LoggerFactory.getLogger(WorkflowRetentionAndExpiration.class);
public static final ConfigKey<String> WORKFLOW_RETENTION_DEFAULT = ConfigKeys.newStringConfigKey("workflow.retention.default",
- "Default retention for workflows", "3");
+ "Default retention for workflows (persisted)", "3");
+ public static final ConfigKey<String> WORKFLOW_RETENTION_DEFAULT_SOFT = ConfigKeys.newStringConfigKey("workflow.retention.default.soft",
+ "Default soft retention for workflows (in-memory)", "3");
+
+ public static void checkpoint(ManagementContext mgmt, WorkflowExecutionContext context) {
+ Entity entity = context.getEntity();
+ if (Entities.isUnmanagingOrNoLongerManaged(entity)) {
+ log.debug("Skipping persistence of "+context+" as entity is no longer active here");
+ return;
+ }
+
+ doGlobalUpdateIfNeededOnDiskAndInMemory(mgmt);
+
+ new WorkflowStatePersistenceViaSensors(mgmt).checkpoint(context, PersistenceWithQueuedTasks.WARN);
+
+ // keep active workflows in memory, even if disabled
+ WorkflowStateActiveInMemory.get(context.getManagementContext()).checkpoint(context);
+ }
+
+ static final long GLOBAL_UPDATE_FREQUENCY = 5*60*1000; // every 5m wipe out old workflows
+
+ static void doGlobalUpdateIfNeededOnDiskAndInMemory(ManagementContext mgmt) {
+ WorkflowStateActiveInMemory inMem = WorkflowStateActiveInMemory.get(mgmt);
+
+ if (inMem.lastGlobalClear + GLOBAL_UPDATE_FREQUENCY > System.currentTimeMillis()) return;
+ inMem.lastGlobalClear = System.currentTimeMillis();
+
+ AtomicInteger total = new AtomicInteger(0);
+ Collection<Entity> entities = mgmt.getEntityManager().getEntities();
+ entities.forEach(entity -> {
+ // on disk
+ int change = new WorkflowStatePersistenceViaSensors(mgmt).expireOldWorkflowsOnDisk(entity, null);
+ if (change!=0) log.debug("Global entity workflow persistence update, removed "+(-change)+" workflows from "+entity);
+ total.addAndGet(change);
+
+ // in memory
+ inMem.recomputeExpiration(entity, null);
+ });
+ if (total.get()!=0) log.debug("Global entity workflow persistence update, removed "+(-total.get())+" workflows across all "+entities.size()+" entities");
+ }
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public static class WorkflowRetentionSettings {
@@ -60,35 +108,64 @@
@JsonIgnore
private transient WorkflowRetentionParser.WorkflowRetentionFilter expiryFn;
+ @JsonIgnore
+ private transient WorkflowRetentionParser.WorkflowRetentionFilter softExpiryFn;
public WorkflowRetentionParser.WorkflowRetentionFilter getExpiryFn(WorkflowExecutionContext w) {
return init(w).expiryFn;
}
+ public WorkflowRetentionParser.WorkflowRetentionFilter getSoftExpiryFn(WorkflowExecutionContext w) {
+ return init(w).softExpiryFn;
+ }
public WorkflowRetentionSettings init(WorkflowExecutionContext w) {
if (w.getParent()!=null && Boolean.TRUE.equals(w.getParent().getRetentionSettings().disabled)) {
disabled = true;
- } else if (expiryFn==null) {
- expiryResolved = expiryResolved!=null ? expiryResolved : expiry;
- expiryFn = new WorkflowRetentionParser(expiryResolved).parse();
- if (w != null) {
- Set<String> set = INIT_REENTRANT.get();
- if (set==null) {
- set = MutableSet.of();
- INIT_REENTRANT.set(set);
+ } else {
+ if (expiryFn == null) {
+ expiryResolved = expiryResolved != null ? expiryResolved : expiry;
+ expiryFn = new WorkflowRetentionParser(expiryResolved).parse();
+ if (w != null) {
+ Set<String> set = INIT_REENTRANT.get();
+ if (set == null) {
+ set = MutableSet.of();
+ INIT_REENTRANT.set(set);
+ }
+ if (!set.add(w.getWorkflowId() + ":" + expiryResolved)) {
+ // double-check we don't cause endless loops; see KeepContext notes
+ throw new IllegalStateException("Invalid workflow retention '" + expiryResolved + "' as it refers to itself");
+ }
+ try {
+ expiryFn = expiryFn.init(w);
+ } finally {
+ set.remove(w.getWorkflowId() + ":" + expiryResolved);
+ if (set.isEmpty()) INIT_REENTRANT.remove();
+ }
}
- if (!set.add(w.getWorkflowId()+":"+expiryResolved)) {
- // double-check we don't cause endless loops; see KeepContext notes
- throw new IllegalStateException("Invalid workflow retention '"+expiryResolved+"' as it refers to itself");
- }
- try {
- expiryFn = expiryFn.init(w);
- } finally {
- set.remove(w.getWorkflowId()+":"+expiryResolved);
- if (set.isEmpty()) INIT_REENTRANT.remove();
- }
+ expiryResolved = expiryFn.toString(); // remove any references to `context` that might trigger an infinite loop
}
- expiryResolved = expiryFn.toString(); // remove any references to `context` that might trigger an infinite loop
+ if (softExpiryFn == null) {
+ softExpiryResolved = softExpiryResolved != null ? softExpiryResolved : softExpiry;
+ softExpiryFn = new WorkflowRetentionParser(softExpiryResolved).soft().parse();
+ if (w != null) {
+ Set<String> set = INIT_REENTRANT.get();
+ if (set == null) {
+ set = MutableSet.of();
+ INIT_REENTRANT.set(set);
+ }
+ if (!set.add(w.getWorkflowId() + ":" + softExpiryResolved)) {
+ // double-check we don't cause endless loops; see KeepContext notes
+ throw new IllegalStateException("Invalid workflow retention '" + softExpiryResolved + "' as it refers to itself");
+ }
+ try {
+ softExpiryFn = softExpiryFn.init(w);
+ } finally {
+ set.remove(w.getWorkflowId() + ":" + softExpiryResolved);
+ if (set.isEmpty()) INIT_REENTRANT.remove();
+ }
+ }
+ softExpiryResolved = softExpiryFn.toString(); // remove any references to `context` that might trigger an infinite loop
+ }
}
return this;
}
@@ -101,42 +178,67 @@
this.expiryFn = r2.expiryFn;
this.expiryResolved = r2.expiryResolved;
}
+ if (Strings.isNonEmpty(r2.softExpiry)) {
+ this.softExpiry = r2.softExpiry;
+ this.softExpiryFn = r2.softExpiryFn;
+ this.softExpiryResolved = r2.softExpiryResolved;
+ }
}
}
static ThreadLocal<Set<String>> INIT_REENTRANT = new ThreadLocal<Set<String>>();
- static Map<String, WorkflowExecutionContext> recomputeExpiration(Map<String, WorkflowExecutionContext> v, WorkflowExecutionContext optionalContext, boolean useSoftlyKeptExpiry) {
+ static Map<String, WorkflowExecutionContext> recomputeExpiration(Map<String, WorkflowExecutionContext> v, @Nullable WorkflowExecutionContext optionalContext, boolean useSoftlyKeptExpiry) {
Set<String> workflowHashesToUpdate = optionalContext!=null ? MutableSet.of(Strings.firstNonBlank(optionalContext.getRetentionHash(), "empty-expiry-hash")) //should always be set
: v.values().stream().map(WorkflowExecutionContext::getRetentionHash).collect(Collectors.toSet());
workflowHashesToUpdate.forEach(k -> {
+ // if optional context supplied, perhaps only recompute for that hash
+ if (optionalContext!=null && !k.equals(optionalContext.getRetentionHash())) {
+ if (!isExpirable(optionalContext)) {
+ return;
+ } else {
+ // no-op -- if it is expirable, do a full recompute for the entity, to ensure sub-workflows are no longer retained
+ // (cross-entity subworkflows will not be cleaned up; they will get collected when another workflow runs there,
+ // or when there is a global cleanup event)
+ }
+ }
+
List<WorkflowExecutionContext> finishedTwins = v.values().stream()
.filter(c -> k.equals(c.getRetentionHash()))
.filter(c -> isExpirable(c))
- .filter(c -> !c.equals(optionalContext))
.collect(Collectors.toList());
if (finishedTwins.isEmpty()) return;
- Optional<WorkflowExecutionContext> existingRetentionExpiry = finishedTwins.stream().filter(w -> w.getRetentionSettings().expiry != null).findAny();
+ Function<WorkflowRetentionSettings,String> expiryAccessor = useSoftlyKeptExpiry ? wrs -> wrs.softExpiry : wrs -> wrs.expiry;
+ Optional<WorkflowExecutionContext> existingRetentionExpiry;
+ if (optionalContext!=null && k.equals(optionalContext.getRetentionHash()) && expiryAccessor.apply(optionalContext.getRetentionSettings())!=null)
+ existingRetentionExpiry = Optional.of(optionalContext);
+ else
+ existingRetentionExpiry = finishedTwins.stream().filter(w -> expiryAccessor.apply(w.getRetentionSettings()) != null).findAny();
+
WorkflowRetentionParser.WorkflowRetentionFilter expiry;
- if (useSoftlyKeptExpiry) {
- expiry = WorkflowRetentionParser.newDefaultSoftFilter().init(finishedTwins.iterator().next());
- } else if (existingRetentionExpiry.isPresent()) {
+
+ if (existingRetentionExpiry.isPresent()) {
// log if expiry fn differs for the same hash
// (but note if it refers to parents, invocations from different places could result in different expiry functions)
- if (optionalContext!=null && optionalContext.getRetentionHash().equals(k)) {
- if (optionalContext.getRetentionSettings().expiry != null) {
- if (!optionalContext.getRetentionSettings().expiry.equals(existingRetentionExpiry.get().getRetentionSettings().expiry)) {
- log.warn("Retention specification for " + optionalContext + " '" + optionalContext.getRetentionSettings().expiry + "' is different for same hash. " +
- "Expiry should be constant within a hash but " + existingRetentionExpiry.get() + " has '" + existingRetentionExpiry.get().getRetentionSettings().expiry + "'");
+ // (no such warning for the soft side of it)
+ if (useSoftlyKeptExpiry) {
+ expiry = existingRetentionExpiry.get().getRetentionSettings().getSoftExpiryFn(existingRetentionExpiry.get());
+ } else {
+ if (optionalContext != null && optionalContext.getRetentionHash().equals(k)) {
+ if (optionalContext.getRetentionSettings().expiry != null) {
+ if (!optionalContext.getRetentionSettings().expiry.equals(existingRetentionExpiry.get().getRetentionSettings().expiry)) {
+ log.warn("Retention specification for " + optionalContext + " '" + optionalContext.getRetentionSettings().expiry + "' is different for same hash. " +
+ "Expiry should be constant within a hash but " + existingRetentionExpiry.get() + " has '" + existingRetentionExpiry.get().getRetentionSettings().expiry + "'");
+ }
}
}
+ expiry = existingRetentionExpiry.get().getRetentionSettings().getExpiryFn(existingRetentionExpiry.get());
}
- expiry = existingRetentionExpiry.get().getRetentionSettings().getExpiryFn(existingRetentionExpiry.get());
} else {
- expiry = WorkflowRetentionParser.newDefaultFilter().init(finishedTwins.iterator().next());
+ expiry = WorkflowRetentionParser.newDefaultFilter(useSoftlyKeptExpiry).init(finishedTwins.iterator().next());
}
Collection<WorkflowExecutionContext> retainedFinishedTwins = expiry.apply(finishedTwins);
@@ -169,12 +271,16 @@
private static boolean isExpirable(WorkflowExecutionContext c) {
if (c.getStatus() == null || !c.getStatus().expirable) return false;
- if (c.getParent()!=null) {
- // XXX for size reasons, should skip this - fow now, don't expire children workflows unless parents are also expirable
- if (!isExpirable(c.getParent())) return false;
- // we could weaken this if we have lots of children workflows, but that is more work; left as an enhancement
+ // should we expire of completed children workflows even if an ancestor workflow is not expirable?
+ // this would prevent silly retention of workflows whose parents are about to end, where the retention check runs on the child just before the parent finishes;
+ // however it would also limit the ability to inspect children workflows e.g. in a foreach block, as only e.g. 3 of the children would be kept ever.
+ // on balance, do NOT expire those; wait for another event to trigger their clean-up.
+ // (if the child workflow is marked disabled, it is not persisted, but takes effect in all other cases.)
+ if (c.getParent()!=null) {
+ if (!isExpirable(c.getParent())) return false;
}
+
return true;
}
@@ -192,6 +298,6 @@
public static void expireOldWorkflows(Entity entity) {
- new WorkflowStatePersistenceViaSensors(((EntityInternal)entity).getManagementContext()).updateMaps(entity, true, true, null, null);
+ new WorkflowStatePersistenceViaSensors(((EntityInternal)entity).getManagementContext()).updateMaps(entity, null, true, true, true, null, null);
}
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
index 88fe231..0083b16 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
+import javax.annotation.Nullable;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -30,7 +31,6 @@
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
-import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
import org.slf4j.Logger;
@@ -42,7 +42,10 @@
public static final ConfigKey<WorkflowStateActiveInMemory> IN_MEMORY_WORKFLOWS = ConfigKeys.newConfigKey(WorkflowStateActiveInMemory.class, "internals.brooklyn.workflow.in_memory");
- private static final long GLOBAL_UPDATE_FREQUENCY = 5*60*1000; // every 5m wipe out workflows from old entities
+ long lastInMemEntitiesClear = System.currentTimeMillis();
+
+ // this applies to both sensors and active, but is stored here as this instance is kept on the mgmt context
+ long lastGlobalClear = System.currentTimeMillis();
public static WorkflowStateActiveInMemory get(ManagementContext mgmt) {
WorkflowStateActiveInMemory localActiveWorkflows = mgmt.getScratchpad().get(IN_MEMORY_WORKFLOWS);
@@ -69,10 +72,8 @@
this.mgmt = mgmt;
}
- long lastInMemClear = System.currentTimeMillis();
-
public void expireAbsentEntities() {
- lastInMemClear = System.currentTimeMillis();
+ lastInMemEntitiesClear = System.currentTimeMillis();
Set<String> copy;
synchronized (active) { copy = MutableSet.copyOf(active.keySet()); }
synchronized (completedSoftlyKept) { copy.addAll(completedSoftlyKept.keySet()); }
@@ -89,11 +90,12 @@
if (context.getStatus().expirable) {
withActiveForEntity(context.getEntity().getId(), false, wfm -> wfm.remove(context.getWorkflowId()));
withSoftlyKeptForEntity(context.getEntity().getId(), true, wfm -> { wfm.put(context.getWorkflowId(), context); return null; });
+ recomputeExpiration(context.getEntity(), context);
} else {
// keep active workflows in memory, even if disabled
withActiveForEntity(context.getEntity().getId(), true, wfm -> wfm.put(context.getWorkflowId(), context));
}
- if (lastInMemClear + GLOBAL_UPDATE_FREQUENCY < System.currentTimeMillis()) {
+ if (lastInMemEntitiesClear + WorkflowRetentionAndExpiration.GLOBAL_UPDATE_FREQUENCY < System.currentTimeMillis()) {
// poor man's cleanup, every minute, but good enough
expireAbsentEntities();
}
@@ -160,9 +162,9 @@
return result;
}
- public void recomputeExpiration(Entity entity) {
+ public void recomputeExpiration(Entity entity, @Nullable WorkflowExecutionContext optionalContext) {
withSoftlyKeptForEntity(entity.getId(), false, wfm -> {
- WorkflowRetentionAndExpiration.recomputeExpiration(wfm.asMap(), null, true);
+ WorkflowRetentionAndExpiration.recomputeExpiration(wfm.asMap(), optionalContext, true);
return null;
});
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
index 97c37fc..ffd1698 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
@@ -18,6 +18,14 @@
*/
package org.apache.brooklyn.core.workflow.store;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
import com.google.common.reflect.TypeToken;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.ManagementContext;
@@ -25,7 +33,6 @@
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
@@ -35,15 +42,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
public class WorkflowStatePersistenceViaSensors {
private static final Logger log = LoggerFactory.getLogger(WorkflowStatePersistenceViaSensors.class);
@@ -52,8 +50,6 @@
public static final AttributeSensor<Map<String,WorkflowExecutionContext>> INTERNAL_WORKFLOWS = Sensors.newSensor(new TypeToken<Map<String, WorkflowExecutionContext>>() {}, "internals.brooklyn.workflow");
- private static final long GLOBAL_UPDATE_FREQUENCY = 5*60*1000; // every 5m wipe out old workflows
-
public static WorkflowStatePersistenceViaSensors get(ManagementContext mgmt) {
WorkflowStatePersistenceViaSensors sharedInstance = mgmt.getScratchpad().get(SENSOR_WORKFLOW_PERSISTER);
if (sharedInstance==null) {
@@ -76,27 +72,11 @@
enum PersistenceWithQueuedTasks { ALLOW, WARN, FAIL }
- long lastInMemClear = System.currentTimeMillis();
-
- public void checkpoint(WorkflowExecutionContext context) {
- checkpoint(context, PersistenceWithQueuedTasks.WARN);
- }
- public void checkpoint(WorkflowExecutionContext context, PersistenceWithQueuedTasks expectQueuedTasks) {
- doGlobalUpdateIfNeeded();
-
- Entity entity = context.getEntity();
- if (Entities.isUnmanagingOrNoLongerManaged(entity)) {
- log.debug("Skipping persistence of "+context+" as entity is no longer active here");
- return;
- }
-
- // keep active workflows in memory, even if disabled
- WorkflowStateActiveInMemory.get(context.getManagementContext()).checkpoint(context);
-
+ protected void checkpoint(WorkflowExecutionContext context, PersistenceWithQueuedTasks expectQueuedTasks) {
if (Boolean.TRUE.equals(context.getRetentionSettings().disabled)) {
if (getFromTag(BrooklynTaskTags.tagForWorkflow(context), false, false)!=null) {
// need to clear
- updateMap(entity, false, true, v -> v.remove(context.getWorkflowId(), context));
+ updateMap(context.getEntity(), context, false, true, v -> v.remove(context.getWorkflowId(), context));
}
return;
}
@@ -110,28 +90,15 @@
}
}
- expireOldWorkflows(entity, context);
+ expireOldWorkflowsOnDisk(context.getEntity(), context);
}
- private void doGlobalUpdateIfNeeded() {
- if (lastInMemClear + GLOBAL_UPDATE_FREQUENCY > System.currentTimeMillis()) return;
- lastInMemClear = System.currentTimeMillis();
- AtomicInteger total = new AtomicInteger(0);
- Collection<Entity> entities = mgmt.getEntityManager().getEntities();
- entities.forEach(entity -> {
- int change = expireOldWorkflows(entity, null);
- if (change!=0) log.debug("Global entity workflow persistence update, removed "+(-change)+" workflows from "+entity);
- total.addAndGet(change);
- });
- if (total.get()!=0) log.debug("Global entity workflow persistence update, removed "+(-total.get())+" workflows across all "+entities.size()+" entities");
- }
-
- public int expireOldWorkflows(Entity entity, @Nullable WorkflowExecutionContext context) {
+ int expireOldWorkflowsOnDisk(Entity entity, @Nullable WorkflowExecutionContext context) {
// clear interrupt status so we can persist e.g. if we are interrupted or shutdown
boolean interrupted = Thread.interrupted();
boolean doExpiry = WorkflowRetentionAndExpiration.isExpirationCheckNeeded(entity);
try {
- return updateMaps(entity, doExpiry, true, context==null ? null : v -> v.put(context.getWorkflowId(), context), null);
+ return updateMaps(entity, null, doExpiry, false, true, context==null ? null : v -> v.put(context.getWorkflowId(), context), null);
} finally {
if (interrupted) Thread.currentThread().interrupt();
@@ -142,7 +109,7 @@
if (w.getStatus()==null || w.getStatus().expirable || w.getStatus()== WorkflowExecutionContext.WorkflowStatus.STAGED) {
log.debug("Explicit request to delete workflow "+w);
AtomicBoolean result = new AtomicBoolean(false);
- updateMaps(w.getEntity(), false, true, map -> {
+ updateMaps(w.getEntity(), w, false, false, true, map -> {
boolean removed = WorkflowRetentionAndExpiration.deleteWorkflowFromMap(map, w, true, true);
if (removed) result.set(true);
}, w);
@@ -153,26 +120,26 @@
}
}
- int updateMaps(Entity entity, boolean doExpiry, boolean persist, Consumer<Map<String,WorkflowExecutionContext>> action, WorkflowExecutionContext contextToRemoveFromSoftMemory) {
- int result = updateMap(entity, doExpiry, persist, action);
+ int updateMaps(Entity entity, @Nullable WorkflowExecutionContext optionalContext, boolean doExpiryForSensor, boolean doExpiryInMemory, boolean persist, Consumer<Map<String,WorkflowExecutionContext>> action, WorkflowExecutionContext contextToRemoveFromSoftMemory) {
+ int result = updateMap(entity, optionalContext, doExpiryForSensor, persist, action);
// and update softly kept
WorkflowStateActiveInMemory activeInMemory = WorkflowStateActiveInMemory.get(mgmt);
if (contextToRemoveFromSoftMemory!=null) {
activeInMemory.deleteWorkflow(contextToRemoveFromSoftMemory);
}
- if (doExpiry) activeInMemory.recomputeExpiration(entity);
+ if (doExpiryInMemory) activeInMemory.recomputeExpiration(entity, optionalContext);
return result;
}
- int updateMap(Entity entity, boolean doExpiry, boolean persist, Consumer<Map<String,WorkflowExecutionContext>> action) {
+ int updateMap(Entity entity, @Nullable WorkflowExecutionContext optionalContext, boolean doExpiry, boolean persist, Consumer<Map<String,WorkflowExecutionContext>> action) {
AtomicInteger delta = new AtomicInteger(0);
entity.sensors().modify(INTERNAL_WORKFLOWS, vo -> {
Map<String, WorkflowExecutionContext> v = MutableMap.copyOf(vo);
delta.set(-v.size());
if (action!=null) action.accept(v);
- if (doExpiry) v = WorkflowRetentionAndExpiration.recomputeExpiration(v, null, false);
+ if (doExpiry) v = WorkflowRetentionAndExpiration.recomputeExpiration(v, optionalContext, false);
delta.getAndAdd(v.size());
return Maybe.of(v);
});
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
index a5dbe86..db5a2bd 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
@@ -18,6 +18,18 @@
*/
package org.apache.brooklyn.core.workflow.utils;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
import org.apache.brooklyn.core.workflow.WorkflowExpressionResolution;
import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration;
@@ -27,13 +39,7 @@
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
-
-import javax.annotation.Nullable;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.*;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
public class WorkflowRetentionParser {
@@ -82,17 +88,48 @@
continue;
}
- // TODO soft/hard keyword; take whichever occurs last
- if (retentionExpression.contains(" hash ")) {
- if (result.hash != null)
- throw new IllegalArgumentException("Cannot set multiple 'hash' in retention expression");
- int i = retentionExpression.indexOf(" hash ");
- result.hash = Strings.removeFromStart(retentionExpression.substring(i).trim(), "hash").trim();
- retentionExpression = retentionExpression.substring(0, i).trim();
- continue;
+ List<Pair<String,Integer>> specialTerms = MutableList.of();
+ for (String term: MutableList.of("hash", "soft", "hard"))
+ specialTerms.add(Pair.of(term, retentionExpression.indexOf(" "+term+" ")));
+ Collections.sort(specialTerms, (x,y) -> -Integer.compare(x.getRight(), y.getRight()));
+ Pair<String, Integer> last = specialTerms.iterator().next();
+ if (last.getRight()>=0) {
+ if ("hash".equals(last.getLeft())) {
+ if (result.hash != null)
+ throw new IllegalArgumentException("Cannot set multiple 'hash' in retention expression");
+ result.hash = Strings.removeFromStart(retentionExpression.substring(last.getRight()).trim(), last.getLeft()).trim();
+ retentionExpression = retentionExpression.substring(0, last.getRight()).trim();
+ continue;
+ }
+ if ("hard".equals(last.getLeft())) {
+ if (result.softExpiry != null)
+ throw new IllegalArgumentException("Cannot set multiple 'hard' or 'soft' in retention expression");
+ result.softExpiry = "0";
+ String hardTrailing = Strings.removeFromStart(retentionExpression.substring(last.getRight()).trim(), last.getLeft()).trim();
+ if (Strings.isNonBlank(hardTrailing)) {
+ if (last.getRight() == 0) retentionExpression = hardTrailing;
+ else throw new IllegalArgumentException("Cannot have retention definition both before and after 'hard' keyword");
+ } else {
+ retentionExpression = retentionExpression.substring(0, last.getRight()).trim();
+ }
+ continue;
+ }
+ if ("soft".equals(last.getLeft())) {
+ if (result.softExpiry != null)
+ throw new IllegalArgumentException("Cannot set multiple 'hard' or 'soft' in retention expression");
+ String softTrailing = Strings.removeFromStart(retentionExpression.substring(last.getRight()).trim(), last.getLeft()).trim();
+ if (Strings.isNonBlank(softTrailing)) {
+ result.softExpiry = softTrailing;
+ new WorkflowRetentionParser(result.softExpiry).soft().parse();
+ retentionExpression = retentionExpression.substring(0, last.getRight()).trim();
+ } else {
+ throw new IllegalArgumentException("Specification for 'soft' retetntion must provide retention expression after the keyword");
+ }
+ continue;
+ }
}
break;
- } while (false);
+ } while (true);
if (retentionExpression.equals("disabled")) {
result.disabled = true;
@@ -201,6 +238,8 @@
static abstract class KeepDelegate implements WorkflowRetentionFilter {
WorkflowRetentionFilter delegate;
+ final boolean soft;
+ KeepDelegate(boolean soft) { this.soft = soft; }
@Override
public Collection<WorkflowExecutionContext> apply(Collection<WorkflowExecutionContext> workflowExecutionContexts) {
if (delegate==null) throw new IllegalStateException("Not initialized");
@@ -214,30 +253,31 @@
protected abstract WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow);
}
static class KeepSystem extends KeepDelegate {
+ KeepSystem(boolean soft) { super(soft); }
@Override
public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow) {
if (workflow==null) throw new IllegalStateException("Retention 'system' cannot be used here");
- return new WorkflowRetentionParser(workflow.getManagementContext().getConfig().getConfig(WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT)).parse().init(null);
+ return new WorkflowRetentionParser(workflow.getManagementContext().getConfig().getConfig(
+ soft ? WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT_SOFT : WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT))
+ .soft(soft).parse().init(null);
}
@Override
public String toString() {
return "system";
}
}
- public static WorkflowRetentionFilter newDefaultFilter() {
- return new KeepParent();
- }
- public static WorkflowRetentionFilter newDefaultSoftFilter() {
- return new KeepSystem();
+ public static WorkflowRetentionFilter newDefaultFilter(boolean soft) {
+ return new KeepParent(soft);
}
static class KeepParent extends KeepDelegate {
+ KeepParent(boolean soft) { super(soft); }
@Override
public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow) {
if (workflow == null) throw new IllegalStateException("Retention 'parent' cannot be used here");
else if (workflow.getParent()!=null) {
- return workflow.getParent().getRetentionSettings().getExpiryFn(workflow.getParent());
+ return soft ? workflow.getParent().getRetentionSettings().getSoftExpiryFn(workflow.getParent()) : workflow.getParent().getRetentionSettings().getExpiryFn(workflow.getParent());
} else {
- return new KeepSystem().init(workflow);
+ return new KeepSystem(soft).init(workflow);
}
}
@Override
@@ -246,12 +286,13 @@
}
}
static class KeepContext extends KeepDelegate {
+ KeepContext(boolean soft) { super(soft); }
@Override
public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow) {
if (workflow == null) throw new IllegalStateException("Retention 'context' cannot be used here");
// expands to string to something that doesn't reference context so that this does not infinitely recurse
- return workflow.getRetentionSettings().getExpiryFn(workflow);
+ return soft ? workflow.getRetentionSettings().getSoftExpiryFn(workflow) : workflow.getRetentionSettings().getExpiryFn(workflow);
}
@Override
public String toString() {
@@ -261,17 +302,21 @@
String fullExpression;
String rest;
+ boolean soft = false;
public WorkflowRetentionParser(String fullExpression) {
this.fullExpression = fullExpression;
}
+ public WorkflowRetentionParser soft() { return soft(true); }
+ public WorkflowRetentionParser soft(boolean soft) { this.soft = soft; return this; }
+
public WorkflowRetentionFilter parse() {
- if (Strings.isBlank(fullExpression)) return newDefaultFilter();
+ if (Strings.isBlank(fullExpression)) return newDefaultFilter(soft);
rest = Strings.trimStart(fullExpression.toLowerCase());
WorkflowRetentionFilter result = parseTerm();
- if (!Strings.isBlank(rest)) return newDefaultFilter();
+ if (!Strings.isBlank(rest)) return newDefaultFilter(soft);
return result;
}
@@ -337,9 +382,9 @@
if (term.isPresent()) return term.get();
if (eatNA("all") || eatNA("forever")) return new KeepAll();
- if (eatNA("system")) return new KeepSystem();
- if (eatNA("parent")) return new KeepParent();
- if (eatNA("context")) return new KeepContext();
+ if (eatNA("system")) return new KeepSystem(soft);
+ if (eatNA("parent")) return new KeepParent(soft);
+ if (eatNA("context")) return new KeepContext(soft);
int i = maxPositive(rest.indexOf(","), rest.indexOf(")"));
if (i==-1) i = rest.length();
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
index f90a0a2..4f5cb36 100644
--- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
@@ -20,6 +20,7 @@
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
+import com.google.common.base.Predicates;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import org.apache.brooklyn.api.entity.EntityLocal;
@@ -39,6 +40,7 @@
import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixture;
import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration;
import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors;
import org.apache.brooklyn.entity.stock.BasicApplication;
import org.apache.brooklyn.test.Asserts;
@@ -927,6 +929,18 @@
Asserts.assertSize(wp.get().getWorkflows(app, false).keySet(), 1);
Asserts.assertSize(wp.get().getWorkflows(app, true).keySet(), 3);
+ // setting soft 4 should give one more, and allow hash keyword at end
+ w1 = doTestRetentionDisabled(2, "min(1,context) soft 4 hash my-fixed-hash", false, false, false);
+ Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().softExpiryResolved, "4");
+ Asserts.assertSize(wp.get().getWorkflows(app, false).keySet(), 1);
+ Asserts.assertSize(wp.get().getWorkflows(app, true).keySet(), 4);
+
+ // and soft with min(2,...) allowing hash keyword before should also work
+ w1 = doTestRetentionDisabled(2, "min(1,context) hash my-fixed-hash soft min(2,system)", false, false, false);
+ Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().softExpiryResolved, "min(2,system)");
+ Asserts.assertSize(wp.get().getWorkflows(app, false).keySet(), 1);
+ Asserts.eventually(() -> wp.get().getWorkflows(app, true).keySet().size(), Predicates.equalTo(2), Duration.seconds(2));
+
// invoking our test gives a new workflow hash because the effector name is different
w2 = doTestRetentionDisabled(2, "1", false, false, false);
Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
@@ -978,7 +992,7 @@
// wait 5s and run something, it should cause everything else to expire
Time.sleep(Duration.FIVE_SECONDS);
- wp.get().expireOldWorkflows(app, null);
+ WorkflowRetentionAndExpiration.expireOldWorkflows(app);
// should now be empty
Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of());
@@ -991,17 +1005,17 @@
Time.sleep(Duration.seconds(5));
w3 = doTestRetentionDisabled("hash my-fixed-hash max(1,"+longWait+")", "context", false, true, false);
// should now have all 3
- wp.get().expireOldWorkflows(app, null);
+ WorkflowRetentionAndExpiration.expireOldWorkflows(app);
Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId()));
Time.sleep(Duration.seconds(5));
// now just the last 1 (only 1 in 10s)
- wp.get().expireOldWorkflows(app, null);
+ WorkflowRetentionAndExpiration.expireOldWorkflows(app);
Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w3.getWorkflowId()));
Time.sleep(Duration.seconds(5));
// still have last 1 (even after 10s)
- wp.get().expireOldWorkflows(app, null);
+ WorkflowRetentionAndExpiration.expireOldWorkflows(app);
Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w3.getWorkflowId()));
// run two more, that's all we should have
diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java
index 7d1849a..4defc5b 100644
--- a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java
@@ -129,7 +129,13 @@
}
if (latch instanceof TerminableCountDownLatch) ((TerminableCountDownLatch)latch).terminate();
}
- super.tearDown(Duration.millis(10)); // stops here can be blocked, don't wait on them
+ try {
+ super.tearDown(Duration.millis(50)); // stops here can be blocked, don't wait on them
+ } catch (Exception e) {
+ // we fail on this in case it is a real problem, but not believed to be, only seen occasionally, and not since timeout was increased 2024-04-01
+ LOG.warn("Teardown of test encountered exception; not unknown if multiple processes attempt to destroy, as destruction is deliberately unsynchronized to minimize race errors", e);
+ throw Exceptions.propagateAnnotated("Concurrent teardown issue", e);
+ }
if (executor != null) executor.shutdownNow();
} finally {
latches.clear();