blob: edb661fae1231371ffa83b5cf5ce49e699feb7fe [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.simulator;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.EnumSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import io.netty.util.internal.DefaultPriorityQueue;
import io.netty.util.internal.PriorityQueue;
import io.netty.util.internal.PriorityQueueNode;
import org.apache.cassandra.simulator.Ordered.StrictlyOrdered;
import org.apache.cassandra.utils.Throwables;
import static org.apache.cassandra.config.CassandraRelevantProperties.TEST_SIMULATOR_DEBUG;
import static org.apache.cassandra.simulator.Action.Modifier.*;
import static org.apache.cassandra.simulator.Action.Modifiers.NONE;
import static org.apache.cassandra.simulator.Action.Phase.CANCELLED;
import static org.apache.cassandra.simulator.Action.Phase.CONSEQUENCE;
import static org.apache.cassandra.simulator.Action.Phase.FINISHED;
import static org.apache.cassandra.simulator.Action.Phase.INVALIDATED;
import static org.apache.cassandra.simulator.Action.Phase.NASCENT;
import static org.apache.cassandra.simulator.Action.Phase.STARTED;
import static org.apache.cassandra.simulator.Action.Phase.WITHHELD;
import static org.apache.cassandra.simulator.Action.RegisteredType.CHILD;
import static org.apache.cassandra.simulator.Action.RegisteredType.LISTENER;
import static org.apache.cassandra.simulator.ActionListener.Before.DROP;
import static org.apache.cassandra.simulator.ActionListener.Before.INVALIDATE;
import static org.apache.cassandra.simulator.ActionListener.Before.EXECUTE;
import static org.apache.cassandra.simulator.utils.CompactLists.append;
import static org.apache.cassandra.simulator.utils.CompactLists.remove;
import static org.apache.cassandra.simulator.utils.CompactLists.safeForEach;
public abstract class Action implements PriorityQueueNode
private static final boolean DEBUG = TEST_SIMULATOR_DEBUG.getBoolean();
public enum Modifier
* Never drop an action. Primarily intended to transitively mark an action's descendants as
* required to succeed (e.g. in the case of repair and other cluster actions that are brittle)
RELIABLE('r', true),
* Mark an action to be discarded, which may result in some alternative action being undertaken.
* This marker is only to ensure correctness, as the simulator will fail if an action is performed
* that is marked DROP and RELIABLE.
DROP('f', false),
// indicates some scheduler delay should be added to the execution time of this action
// TODO (feature): support per node scheduler delay to simulate struggling nodes
THREAD_SIGNAL('t', false),
* a general purpose mechanism for withholding actions until all other actions have had an opportunity to run
* (intended primarily for TIMEOUT+NO_TIMEOUTS, but may be used elsewhere)
* this is a very similar feature to Ordered, but it was easier to model this way, as the predecessors are
* all other child actions in the entire transitive closure of operations, with the premise that the action
* will no longer be valid by the time it has an opportunity run
WITHHOLD((char)0, false),
// Mark operations as a THREAD_TIMEOUT, and parent operations as forbidding such timeouts (unless all else has failed)
NO_THREAD_TIMEOUTS('n', true, null, true), THREAD_TIMEOUT('t', false, NO_THREAD_TIMEOUTS),
* All children of this action should be performed in strict order wrt the parent's consequences
* i.e. this is the continuation version of {@link #STRICT_CHILD_ORDER}
* this is a bit clunky, but not the end of the world
* All children of this action should be performed in strict order, which means not only that
* they must be performed in the provided order, but all of their consequences must finish before
* the next sibling is permitted to run
* InfiniteLoopExecutors, when started, should be treated as detached from the action that happens to start them
* so the child action is considered to be orphaned, and not registered or counted against its parent action
ORPHAN('o', false),
* Must be combined with ORPHAN. Unlinks an Action from its direct parent, attaching it as a child of its
* grandparent. This is used to support streams of streams
ORPHAN_TO_GRANDPARENT((char)0, false),
* When we both deliver a message and timeout, the timeout may be scheduled for much later. We do not want to
* apply restrictions on later operations starting because we are waiting for a timeout to fire in this case,
* so we detach the timeout from its parent's accounting - but re-attach its children to the parent if
* still alive. Must be coincident with ORPHAN.
PSEUDO_ORPHAN('p', false),
* Recurring tasks, that the schedule may discount when determining if has terminated
STREAM((char)0, false),
* Recurring scheduled tasks, that the schedule should discount when determining if has terminated
DAEMON((char)0, false),
* Informational messages produced for logging only
INFO((char)0, false),
* A thread wakeup action that is ordinarily filtered from logging.
WAKEUP('w', false),
* Show this action in the chain of origin actions
DISPLAY_ORIGIN('d', false);
final char displayId;
final boolean heritable;
final Modifier withholdIfPresent;
final Modifier inheritIfContinuation;
private Modifiers asSet;
Modifier(char displayId, boolean heritable)
this(displayId, heritable, null);
Modifier(char displayId, boolean heritable, Modifier withholdIfPresent)
this(displayId, heritable, withholdIfPresent, null);
Modifier(char displayId, boolean heritable, Modifier withholdIfPresent, boolean inheritIfContinuation)
this.displayId = displayId;
this.heritable = heritable;
this.withholdIfPresent = withholdIfPresent;
this.inheritIfContinuation = inheritIfContinuation ? this : null;
Modifier(char displayId, boolean heritable, Modifier withholdIfPresent, Modifier inheritIfContinuation)
this.displayId = displayId;
this.heritable = heritable;
this.withholdIfPresent = withholdIfPresent;
this.inheritIfContinuation = inheritIfContinuation;
Modifiers asSet()
if (asSet == null)
asSet = Modifiers.of(this);
return asSet;
public static class Modifiers implements Serializable
public static final Modifiers NONE = of();
public static final Modifiers INFO = Modifier.INFO.asSet();
public static final Modifiers RELIABLE = Modifier.RELIABLE.asSet();
public static final Modifiers DROP = Modifier.DROP.asSet();
public static final Modifiers PSEUDO_ORPHAN = of(Modifier.PSEUDO_ORPHAN);
public static final Modifiers STREAM = of(Modifier.STREAM);
public static final Modifiers INFINITE_STREAM = of(Modifier.STREAM, DAEMON);
public static final Modifiers STREAM_ITEM = of(Modifier.STREAM, ORPHAN, ORPHAN_TO_GRANDPARENT);
public static final Modifiers START_TASK = of(THREAD_SIGNAL);
public static final Modifiers START_THREAD = of(THREAD_SIGNAL);
public static final Modifiers START_INFINITE_LOOP = of(ORPHAN, THREAD_SIGNAL);
public static final Modifiers START_SCHEDULED_TASK = of(THREAD_SIGNAL);
public static final Modifiers START_TIMEOUT_TASK = of(Modifier.THREAD_TIMEOUT, THREAD_SIGNAL);
public static final Modifiers START_DAEMON_TASK = of(ORPHAN, Modifier.DAEMON, THREAD_SIGNAL);
public static final Modifiers WAKE_UP_THREAD = of(THREAD_SIGNAL, WAKEUP);
public static final Modifiers STRICT = of(STRICT_CHILD_ORDER);
public static final Modifiers NO_TIMEOUTS = Modifier.NO_THREAD_TIMEOUTS.asSet();
public static final Modifiers RELIABLE_NO_TIMEOUTS = of(Modifier.NO_THREAD_TIMEOUTS, Modifier.RELIABLE);
public static final Modifiers DISPLAY_ORIGIN = of(Modifier.DISPLAY_ORIGIN);
public static Modifiers of()
return new Modifiers(EnumSet.noneOf(Modifier.class));
public static Modifiers of(Modifier first, Modifier ... rest)
return new Modifiers(EnumSet.of(first, rest));
final EnumSet<Modifier> contents;
Modifiers(EnumSet<Modifier> contents)
this.contents = contents;
public Modifiers with(Modifiers add)
if (add == NONE)
return this;
if (this == NONE)
return add;
if (contents.containsAll(add.contents))
return this;
return add(add.contents);
public Modifiers with(Modifier add)
if (this == NONE)
return add.asSet();
if (contents.contains(add))
return this;
return add(add.asSet().contents);
private Modifiers add(EnumSet<Modifier> add)
EnumSet<Modifier> merge = EnumSet.noneOf(Modifier.class);
for (Modifier modifier : this.contents) add(modifier, merge, add);
for (Modifier modifier : add) add(modifier, merge, this.contents);
return new Modifiers(merge);
private static void add(Modifier modifier, EnumSet<Modifier> to, EnumSet<Modifier> mergingWith)
if (modifier.withholdIfPresent != null && mergingWith.contains(modifier.withholdIfPresent))
// for continuations to inherit the relevant modifiers from their immediate parent
// (since we represent a continuation of the same execution)
public Modifiers inheritIfContinuation(Modifiers inheritIfContinuation)
EnumSet<Modifier> merge = null;
for (Modifier modifier : inheritIfContinuation.contents)
if (modifier.inheritIfContinuation != null)
if (merge == null && !contents.contains(modifier.inheritIfContinuation)) merge = EnumSet.copyOf(contents);
if (merge != null) merge.add(modifier.inheritIfContinuation);
if (merge == null)
return this;
if (!merge.contains(WITHHOLD))
for (Modifier modifier : merge)
if (modifier.withholdIfPresent != null && merge.contains(modifier.withholdIfPresent))
return new Modifiers(merge);
public Modifiers without(Modifier modifier)
if (!contents.contains(modifier))
return this;
EnumSet<Modifier> remove = EnumSet.noneOf(Modifier.class);
return new Modifiers(remove);
public boolean is(Modifier modifier)
return contents.contains(modifier);
enum RegisteredType { LISTENER, CHILD }
enum Phase
// configuration/status
private final Object description;
private OrderOns orderOn;
private Modifiers self, transitive;
private Phase phase = NASCENT;
Ordered ordered;
/** The listeners (and, if DEBUG, children) we have already registered */
private final Map<Object, RegisteredType> registered = new IdentityHashMap<>(2);
/** The list of listeners (for deterministic evaluation order) to notify on any event */
private List<ActionListener> listeners;
/** The immediate parent, and furthest ancestor of this Action */
protected Action parent, origin = this, pseudoParent;
/** The number of direct consequences of this action that have not <i>transitively</i> terminated */
private int childCount;
* Consequences marked WITHHOLD are kept in their parent (or parent thread's) {@code withheld} queue until all
* other immediate children have <i>transitively</i> terminated their execution
private DefaultPriorityQueue<Action> withheld;
// scheduler and scheduled state
protected RunnableActionScheduler scheduler;
private long deadline; // some actions have an associated wall clock time to execute and are first scheduled by this
private double priority; // all actions eventually get prioritised for execution in some order "now"
// used to track the index and priority queue we're being managed for execution by (either by scheduledAt or priority)
private PriorityQueue<?> scheduledIn;
private int scheduledIndex = -1;
// used to track the scheduledAt of events we have moved to actively scheduling/prioritising
private PriorityQueue<?> savedIn;
private int savedIndex = -1;
public Action(Object description, Modifiers self)
this(description, self, NONE);
public Action(Object description, Modifiers self, Modifiers transitive)
this(description, OrderOn.NONE, self, transitive);
public Action(Object description, OrderOns orderOn, Modifiers self, Modifiers transitive)
this.description = description;
if (orderOn == null || self == null || transitive == null)
throw new IllegalArgumentException();
assert -> m.heritable) : transitive.contents.toString();
this.orderOn = orderOn;
this.self = self;
this.transitive = transitive;
public Object description()
return description;
public OrderOns orderOns() { return orderOn; }
public Phase phase() { return phase; }
public Modifiers self() { return self; }
public Modifiers transitive() { return transitive; }
public boolean is(Modifier modifier)
return self.contents.contains(modifier);
public void inherit(Modifiers add)
if (add != NONE)
add(add, add);
public void add(Modifiers self, Modifiers children)
this.self = this.self.with(self);
this.transitive = this.transitive.with(children);
public boolean isStarted()
return phase.compareTo(STARTED) >= 0;
public boolean isFinished()
return phase.compareTo(FINISHED) >= 0;
public boolean isCancelled()
return phase.compareTo(CANCELLED) >= 0;
public boolean isInvalidated()
return phase.compareTo(INVALIDATED) >= 0;
public Action parent()
return parent;
public int childCount()
return childCount;
* Main implementation of {@link #perform()}, that must be completed by an extending classes.
* Does not need to handle consequences, registration etc.
* @return the action consequences
protected abstract ActionList performSimple();
* Invokes {@link #performSimple} before invoking {@link #performed}.
* May be overridden by an extending classes that does not finish immediately (e.g, SimulatedAction).
* MUST handle consequences, registration etc by invoking performed() on its results before returning,
* to register children and record the action's state
* @return the action consequences
protected ActionList performAndRegister()
return performed(performSimple(), true, true);
* Invoke the action, and return its consequences, i.e. any follow up actions.
public final ActionList perform()
Preconditions.checkState(!(is(RELIABLE) && is(Modifier.DROP)));
Throwable fail = safeForEach(listeners, ActionListener::before, this, is(Modifier.DROP) ? DROP : EXECUTE);
if (fail != null)
if (DEBUG && parent != null && parent.registered.get(this) != CHILD) throw new AssertionError();
ActionList next = performAndRegister();
if (is(STRICT_CHILD_ORDER)) next.setStrictlySequentialOn(this);
else if (is(STRICT_CHILD_OF_PARENT_ORDER)) next.setStrictlySequentialOn(parent);
return next;
* To be invoked on the results of {@link #performSimple()} by its implementations.
* We invite the implementation to invoke it so that it may control state either side of its invocation.
* {@link #register(ActionList)}'s the consequences, restores any old withheld actions,
* and updates this Action's internal state.
* @return the provided actions, minus any withheld
protected ActionList performed(ActionList consequences, boolean isStart, boolean isFinish)
assert isStarted() != isStart;
assert !isFinished();
consequences = register(consequences);
assert !consequences.anyMatch(c ->;
if (isFinish) finishedSelf();
else if (isStart) phase = STARTED;
return restoreWithheld(consequences);
* Similar to cancel() but invoked under abnormal termination
public void invalidate()
* To be invoked when this action has become redundant.
* - Marks itself invalidated
* - Notifies its listeners (which may remove it from any ordered sequences in the ActionSchedule)
* - If present, clears itself directly from:
* - its parent's withholding space
* - the schedule's priority queue
public void cancel()
assert !isStarted();
private void invalidate(Phase advanceTo)
if (phase.compareTo(CANCELLED) >= 0)
Throwable fail = safeForEach(listeners, ActionListener::before, this, INVALIDATE);
fail = Throwables.merge(fail, safeInvalidate(phase == CANCELLED));
invalidate(phase == CANCELLED);
protected Throwable safeInvalidate(boolean isCancellation)
return null;
private void invalidate(boolean isCancellation)
if (parent != null && parent.withheld != null && is(WITHHOLD))
if (parent.withheld.remove(this))
if (scheduledIndex >= 0) scheduledIn.remove(this);
if (savedIndex >= 0) savedIn.remove(this);
if (ordered != null) ordered.invalidate(isCancellation);
* Register consequences of this action, i.e.:
* - attach a scheduler to them for prioritising when they are permitted to execute
* - pass them to any listeners as consequences
* - count them as children, and mark ourselves as parent, so that we may track transitive completion
* - withhold any actions that are so marked, to be {@link #restoreWithheld}d once we have transitively completed
* all non-withheld actions.
protected ActionList register(ActionList consequences)
assert !isFinished();
if (consequences.isEmpty())
return consequences;
Throwable fail = safeForEach(listeners, ActionListener::consequences, consequences);
if (fail != null)
Throwables.merge(fail, consequences.safeForEach(Action::invalidate));
boolean isParentPseudoOrphan = is(PSEUDO_ORPHAN);
boolean withheld = false;
for (int i = 0 ; i < consequences.size() ; ++i)
Action child = consequences.get(i);
if (
if (parent != null &&
else if (
assert !;
Action parent;
if (isParentPseudoOrphan && pseudoParent != null && pseudoParent.childCount > 0)
parent = pseudoParent;
parent = this;
if (
// this could be supported in principle by applying the ordering here, but it would be
// some work to ensure it doesn't lead to deadlocks so for now just assert we don't use it
Preconditions.checkState(! && !;
withheld = true;
if (!withheld)
return consequences;
return consequences.filter(child -> !;
// setup the child relationship, but do not update childCount
private void registerChild(Action child)
assert child.parent == null;
child.parent = this;
if (DEBUG && !register(child, CHILD)) throw new AssertionError();
private void registerPseudoOrphan(Action child)
assert child.parent == null;
assert child.pseudoParent == null;
child.pseudoParent = this;
private void registerChildOrigin(Action child)
if (is(Modifier.DISPLAY_ORIGIN)) child.origin = this;
else if (origin != this) child.origin = origin;
private boolean register(Object object, RegisteredType type)
RegisteredType prev = registered.putIfAbsent(object, type);
if (prev != null && prev != type)
throw new AssertionError();
return prev == null;
public void register(ActionListener listener)
if (register(listener, LISTENER))
listeners = append(listeners, listener);
private boolean deregister(Object object, RegisteredType type)
return registered.remove(object, type);
public void deregister(ActionListener listener)
if (deregister(listener, LISTENER))
listeners = remove(listeners, listener);
private void addWithheld(Action action)
if (withheld == null)
withheld = new DefaultPriorityQueue<>(Action::compareByPriority, 2);
* Restore withheld (by ourselves or a parent) actions, when no other outstanding actions remain
public ActionList restoreWithheld(ActionList consequences)
if (withheld != null && childCount == withheld.size())
Action next = withheld.poll();
consequences = consequences.andThen(next);
else if (childCount == 0 && parent != null)
Action cur = parent;
while (cur.childCount == 0 && cur.parent != null)
cur = cur.parent;
consequences = cur.restoreWithheld(consequences);
return consequences;
private void cleanupWithheld()
Action cur = this;
if (cur.withheld.isEmpty())
cur.withheld = null;
* Invoked once we finish executing ourselves. Typically this occurs immediately after invocation,
* but for SimulatedAction it occurs only once the thread terminates its execution.
* In practice this is entirely determined by the {@code isFinished} parameter supplied to
* {@link #performed(ActionList, boolean, boolean)}.
void finishedSelf()
if (phase.compareTo(CANCELLED) < 0)
scheduler = null;
if (withheld != null)
Queue<Action> withheld = this.withheld;
this.withheld = null;
Throwable fail = safeForEach(listeners, ActionListener::after, this);
if (childCount == 0)
fail = Throwables.merge(fail, transitivelyFinished());
if (fail != null)
* Invoked once all of the consequences of this action, and of those actions (recursively) have completed.
Throwable transitivelyFinished()
return transitivelyFinished(this);
static Throwable transitivelyFinished(Action cur)
Throwable fail = null;
while (true)
Action parent = cur.parent;
assert 0 == cur.childCount && cur.isFinished();
if (DEBUG && cur.registered.values().stream().anyMatch(t -> t == CHILD)) throw new AssertionError();
fail = Throwables.merge(fail, safeForEach(cur.listeners, ActionListener::transitivelyAfter, cur));
if (parent == null)
if (DEBUG && CHILD != parent.registered.remove(cur)) throw new AssertionError();
if (--parent.childCount == 0 && parent.isFinished()) cur = parent;
else break;
return fail;
void orderOn(OrderOn orderOn)
this.orderOn = this.orderOn.with(orderOn);
void setupOrdering(ActionSchedule schedule)
if (orderOn.isOrdered())
ordered = orderOn.isStrict() ? new StrictlyOrdered(this, schedule) : new Ordered(this, schedule);
for (int i = 0, maxi = orderOn.size(); i < maxi ; ++i)
void advanceTo(Phase phase)
Preconditions.checkArgument(phase.compareTo(this.phase) > 0);
this.phase = phase;
void addTo(PriorityQueue<Action> byDeadline)
Preconditions.checkState(scheduledIndex < 0);
scheduledIn = byDeadline;
void saveIn(PriorityQueue<Action> saveIn)
Preconditions.checkState(savedIndex < 0);
savedIn = saveIn;
void setScheduler(RunnableActionScheduler scheduler)
Preconditions.checkState(this.scheduler == null);
Preconditions.checkState(this.phase == NASCENT);
this.scheduler = scheduler;
void setConsequence()
void schedule(SimulatedTime time, FutureActionScheduler future)
setPriority(time, scheduler.priority());
if (is(THREAD_SIGNAL) || deadline == 0)
long newDeadline = deadline == 0 ? time.nanoTime() : deadline;
newDeadline += future.schedulerDelayNanos();
deadline = newDeadline;
time.onTimeEvent("ResetDeadline", newDeadline);
public void setDeadline(SimulatedTime time, long deadlineNanos)
Preconditions.checkState(deadline == 0);
Preconditions.checkArgument(deadlineNanos >= deadline);
deadline = deadlineNanos;
time.onTimeEvent("SetDeadline", deadlineNanos);
public void setPriority(SimulatedTime time, double priority)
this.priority = priority;
time.onTimeEvent("SetPriority", Double.doubleToLongBits(priority));
public long deadline()
if (deadline < 0) throw new AssertionError();
return deadline;
public double priority()
return priority;
public int priorityQueueIndex(DefaultPriorityQueue<?> queue)
if (queue == scheduledIn) return scheduledIndex;
else if (queue == savedIn) return savedIndex;
else return -1;
public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i)
if (queue == scheduledIn) { scheduledIndex = i; if (i < 0) scheduledIn = null; }
else if (queue == savedIn) { savedIndex = i; if (i < 0) savedIn = null; }
else throw new IllegalStateException();
public int compareByDeadline(Action that)
return, that.deadline);
public int compareByPriority(Action that)
return, that.priority);
private String describeModifiers()
StringBuilder builder = new StringBuilder("[");
for (Modifier modifier : self.contents)
if (modifier.displayId == 0)
if (! builder.append(modifier.displayId);
else builder.append(Character.toUpperCase(modifier.displayId));
boolean hasTransitiveOnly = false;
for (Modifier modifier : transitive.contents)
if (modifier.displayId == 0)
if (!
if (!hasTransitiveOnly)
hasTransitiveOnly = true;
if (builder.length() == 1)
return "";
if (hasTransitiveOnly)
return builder.toString();
public String toString()
return describeModifiers() + description() + (origin != this ? " for " + origin : "");
public String toReconcileString()
return this + " at [" + deadline + ',' + priority + ']';
private static class StackElement
final Action action;
final Deque<Action> children;
private StackElement(Action action)
this.action = action;
this.children = new ArrayDeque<>(action.childCount);
for (Map.Entry<Object, RegisteredType> e : action.registered.entrySet())
if (e.getValue() == CHILD)
children.add((Action) e.getKey());
public String describeCurrentState()
StringBuilder sb = new StringBuilder();
Deque<StackElement> stack = new ArrayDeque<>();
stack.push(new StackElement(this));
while (!stack.isEmpty())
StackElement last = stack.peek();
if (last.children.isEmpty())
Action child = last.children.pop();
appendPrefix(stack.size(), sb);
stack.push(new StackElement(child));
return sb.toString();
private static void appendPrefix(int count, StringBuilder sb)
while (--count >= 0)
sb.append(" |");
private void appendCurrentState(StringBuilder sb)
if (!isStarted()) sb.append("NOT_STARTED ");
else if (!isFinished()) sb.append("NOT_FINISHED ");
if (childCount > 0)
sb.append(") ");
if (orderOn.isOrdered())
sb.append(": ");