blob: edb661fae1231371ffa83b5cf5ce49e699feb7fe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.simulator;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.EnumSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import com.google.common.base.Preconditions;
import io.netty.util.internal.DefaultPriorityQueue;
import io.netty.util.internal.PriorityQueue;
import io.netty.util.internal.PriorityQueueNode;
import org.apache.cassandra.simulator.Ordered.StrictlyOrdered;
import org.apache.cassandra.simulator.systems.SimulatedTime;
import org.apache.cassandra.utils.Throwables;
import static org.apache.cassandra.config.CassandraRelevantProperties.TEST_SIMULATOR_DEBUG;
import static org.apache.cassandra.simulator.Action.Modifier.*;
import static org.apache.cassandra.simulator.Action.Modifiers.NONE;
import static org.apache.cassandra.simulator.Action.Phase.CANCELLED;
import static org.apache.cassandra.simulator.Action.Phase.CONSEQUENCE;
import static org.apache.cassandra.simulator.Action.Phase.FINISHED;
import static org.apache.cassandra.simulator.Action.Phase.INVALIDATED;
import static org.apache.cassandra.simulator.Action.Phase.NASCENT;
import static org.apache.cassandra.simulator.Action.Phase.STARTED;
import static org.apache.cassandra.simulator.Action.Phase.WITHHELD;
import static org.apache.cassandra.simulator.Action.RegisteredType.CHILD;
import static org.apache.cassandra.simulator.Action.RegisteredType.LISTENER;
import static org.apache.cassandra.simulator.ActionListener.Before.DROP;
import static org.apache.cassandra.simulator.ActionListener.Before.INVALIDATE;
import static org.apache.cassandra.simulator.ActionListener.Before.EXECUTE;
import static org.apache.cassandra.simulator.utils.CompactLists.append;
import static org.apache.cassandra.simulator.utils.CompactLists.remove;
import static org.apache.cassandra.simulator.utils.CompactLists.safeForEach;
public abstract class Action implements PriorityQueueNode
{
private static final boolean DEBUG = TEST_SIMULATOR_DEBUG.getBoolean();
public enum Modifier
{
/**
* Never drop an action. Primarily intended to transitively mark an action's descendants as
* required to succeed (e.g. in the case of repair and other cluster actions that are brittle)
*/
RELIABLE('r', true),
/**
* Mark an action to be discarded, which may result in some alternative action being undertaken.
* This marker is only to ensure correctness, as the simulator will fail if an action is performed
* that is marked DROP and RELIABLE.
*/
DROP('f', false),
// indicates some scheduler delay should be added to the execution time of this action
// TODO (feature): support per node scheduler delay to simulate struggling nodes
THREAD_SIGNAL('t', false),
/**
* a general purpose mechanism for withholding actions until all other actions have had an opportunity to run
* (intended primarily for TIMEOUT+NO_TIMEOUTS, but may be used elsewhere)
* this is a very similar feature to Ordered, but it was easier to model this way, as the predecessors are
* all other child actions in the entire transitive closure of operations, with the premise that the action
* will no longer be valid by the time it has an opportunity run
*/
WITHHOLD((char)0, false),
// Mark operations as a THREAD_TIMEOUT, and parent operations as forbidding such timeouts (unless all else has failed)
NO_THREAD_TIMEOUTS('n', true, null, true), THREAD_TIMEOUT('t', false, NO_THREAD_TIMEOUTS),
/**
* All children of this action should be performed in strict order wrt the parent's consequences
* i.e. this is the continuation version of {@link #STRICT_CHILD_ORDER}
* this is a bit clunky, but not the end of the world
*/
STRICT_CHILD_OF_PARENT_ORDER('c', false),
/**
* All children of this action should be performed in strict order, which means not only that
* they must be performed in the provided order, but all of their consequences must finish before
* the next sibling is permitted to run
*/
STRICT_CHILD_ORDER('s', true, null, STRICT_CHILD_OF_PARENT_ORDER),
/**
* InfiniteLoopExecutors, when started, should be treated as detached from the action that happens to start them
* so the child action is considered to be orphaned, and not registered or counted against its parent action
*/
ORPHAN('o', false),
/**
* Must be combined with ORPHAN. Unlinks an Action from its direct parent, attaching it as a child of its
* grandparent. This is used to support streams of streams
*/
ORPHAN_TO_GRANDPARENT((char)0, false),
/**
* When we both deliver a message and timeout, the timeout may be scheduled for much later. We do not want to
* apply restrictions on later operations starting because we are waiting for a timeout to fire in this case,
* so we detach the timeout from its parent's accounting - but re-attach its children to the parent if
* still alive. Must be coincident with ORPHAN.
*/
PSEUDO_ORPHAN('p', false),
/**
* Recurring tasks, that the schedule may discount when determining if has terminated
*/
STREAM((char)0, false),
/**
* Recurring scheduled tasks, that the schedule should discount when determining if has terminated
*/
DAEMON((char)0, false),
/**
* Informational messages produced for logging only
*/
INFO((char)0, false),
/**
* A thread wakeup action that is ordinarily filtered from logging.
*/
WAKEUP('w', false),
/**
* Show this action in the chain of origin actions
*/
DISPLAY_ORIGIN('d', false);
final char displayId;
final boolean heritable;
final Modifier withholdIfPresent;
final Modifier inheritIfContinuation;
private Modifiers asSet;
Modifier(char displayId, boolean heritable)
{
this(displayId, heritable, null);
}
Modifier(char displayId, boolean heritable, Modifier withholdIfPresent)
{
this(displayId, heritable, withholdIfPresent, null);
}
Modifier(char displayId, boolean heritable, Modifier withholdIfPresent, boolean inheritIfContinuation)
{
this.displayId = displayId;
this.heritable = heritable;
this.withholdIfPresent = withholdIfPresent;
this.inheritIfContinuation = inheritIfContinuation ? this : null;
}
Modifier(char displayId, boolean heritable, Modifier withholdIfPresent, Modifier inheritIfContinuation)
{
this.displayId = displayId;
this.heritable = heritable;
this.withholdIfPresent = withholdIfPresent;
this.inheritIfContinuation = inheritIfContinuation;
}
Modifiers asSet()
{
if (asSet == null)
asSet = Modifiers.of(this);
return asSet;
}
}
public static class Modifiers implements Serializable
{
public static final Modifiers NONE = of();
public static final Modifiers INFO = Modifier.INFO.asSet();
public static final Modifiers RELIABLE = Modifier.RELIABLE.asSet();
public static final Modifiers DROP = Modifier.DROP.asSet();
public static final Modifiers PSEUDO_ORPHAN = of(Modifier.PSEUDO_ORPHAN);
public static final Modifiers STREAM = of(Modifier.STREAM);
public static final Modifiers INFINITE_STREAM = of(Modifier.STREAM, DAEMON);
public static final Modifiers STREAM_ITEM = of(Modifier.STREAM, ORPHAN, ORPHAN_TO_GRANDPARENT);
public static final Modifiers INFINITE_STREAM_ITEM = of(Modifier.STREAM, DAEMON, ORPHAN, ORPHAN_TO_GRANDPARENT);
public static final Modifiers START_TASK = of(THREAD_SIGNAL);
public static final Modifiers START_THREAD = of(THREAD_SIGNAL);
public static final Modifiers START_INFINITE_LOOP = of(ORPHAN, THREAD_SIGNAL);
public static final Modifiers START_SCHEDULED_TASK = of(THREAD_SIGNAL);
public static final Modifiers START_TIMEOUT_TASK = of(Modifier.THREAD_TIMEOUT, THREAD_SIGNAL);
public static final Modifiers START_DAEMON_TASK = of(ORPHAN, Modifier.DAEMON, THREAD_SIGNAL);
public static final Modifiers WAKE_UP_THREAD = of(THREAD_SIGNAL, WAKEUP);
public static final Modifiers STRICT = of(STRICT_CHILD_ORDER);
public static final Modifiers NO_TIMEOUTS = Modifier.NO_THREAD_TIMEOUTS.asSet();
public static final Modifiers RELIABLE_NO_TIMEOUTS = of(Modifier.NO_THREAD_TIMEOUTS, Modifier.RELIABLE);
public static final Modifiers DISPLAY_ORIGIN = of(Modifier.DISPLAY_ORIGIN);
public static Modifiers of()
{
return new Modifiers(EnumSet.noneOf(Modifier.class));
}
public static Modifiers of(Modifier first, Modifier ... rest)
{
return new Modifiers(EnumSet.of(first, rest));
}
final EnumSet<Modifier> contents;
Modifiers(EnumSet<Modifier> contents)
{
this.contents = contents;
}
public Modifiers with(Modifiers add)
{
if (add == NONE)
return this;
if (this == NONE)
return add;
if (contents.containsAll(add.contents))
return this;
return add(add.contents);
}
public Modifiers with(Modifier add)
{
if (this == NONE)
return add.asSet();
if (contents.contains(add))
return this;
return add(add.asSet().contents);
}
private Modifiers add(EnumSet<Modifier> add)
{
EnumSet<Modifier> merge = EnumSet.noneOf(Modifier.class);
for (Modifier modifier : this.contents) add(modifier, merge, add);
for (Modifier modifier : add) add(modifier, merge, this.contents);
return new Modifiers(merge);
}
private static void add(Modifier modifier, EnumSet<Modifier> to, EnumSet<Modifier> mergingWith)
{
if (modifier.withholdIfPresent != null && mergingWith.contains(modifier.withholdIfPresent))
to.add(WITHHOLD);
to.add(modifier);
}
// for continuations to inherit the relevant modifiers from their immediate parent
// (since we represent a continuation of the same execution)
public Modifiers inheritIfContinuation(Modifiers inheritIfContinuation)
{
EnumSet<Modifier> merge = null;
for (Modifier modifier : inheritIfContinuation.contents)
{
if (modifier.inheritIfContinuation != null)
{
if (merge == null && !contents.contains(modifier.inheritIfContinuation)) merge = EnumSet.copyOf(contents);
if (merge != null) merge.add(modifier.inheritIfContinuation);
}
}
if (merge == null)
return this;
if (!merge.contains(WITHHOLD))
{
for (Modifier modifier : merge)
{
if (modifier.withholdIfPresent != null && merge.contains(modifier.withholdIfPresent))
merge.add(WITHHOLD);
}
}
return new Modifiers(merge);
}
public Modifiers without(Modifier modifier)
{
if (!contents.contains(modifier))
return this;
EnumSet<Modifier> remove = EnumSet.noneOf(Modifier.class);
remove.addAll(this.contents);
remove.remove(modifier);
return new Modifiers(remove);
}
public boolean is(Modifier modifier)
{
return contents.contains(modifier);
}
}
enum RegisteredType { LISTENER, CHILD }
enum Phase
{
NASCENT,
WITHHELD,
CONSEQUENCE,
READY_TO_SCHEDULE,
SEQUENCED_PRE_SCHEDULED,
SCHEDULED,
SEQUENCED_POST_SCHEDULED,
RUNNABLE,
STARTED,
FINISHED,
CANCELLED,
INVALIDATED
}
// configuration/status
private final Object description;
private OrderOns orderOn;
private Modifiers self, transitive;
private Phase phase = NASCENT;
Ordered ordered;
/** The listeners (and, if DEBUG, children) we have already registered */
private final Map<Object, RegisteredType> registered = new IdentityHashMap<>(2);
/** The list of listeners (for deterministic evaluation order) to notify on any event */
private List<ActionListener> listeners;
/** The immediate parent, and furthest ancestor of this Action */
protected Action parent, origin = this, pseudoParent;
/** The number of direct consequences of this action that have not <i>transitively</i> terminated */
private int childCount;
/**
* Consequences marked WITHHOLD are kept in their parent (or parent thread's) {@code withheld} queue until all
* other immediate children have <i>transitively</i> terminated their execution
*/
private DefaultPriorityQueue<Action> withheld;
// scheduler and scheduled state
protected RunnableActionScheduler scheduler;
private long deadline; // some actions have an associated wall clock time to execute and are first scheduled by this
private double priority; // all actions eventually get prioritised for execution in some order "now"
// used to track the index and priority queue we're being managed for execution by (either by scheduledAt or priority)
private PriorityQueue<?> scheduledIn;
private int scheduledIndex = -1;
// used to track the scheduledAt of events we have moved to actively scheduling/prioritising
private PriorityQueue<?> savedIn;
private int savedIndex = -1;
public Action(Object description, Modifiers self)
{
this(description, self, NONE);
}
public Action(Object description, Modifiers self, Modifiers transitive)
{
this(description, OrderOn.NONE, self, transitive);
}
public Action(Object description, OrderOns orderOn, Modifiers self, Modifiers transitive)
{
this.description = description;
if (orderOn == null || self == null || transitive == null)
throw new IllegalArgumentException();
assert transitive.contents.stream().allMatch(m -> m.heritable) : transitive.contents.toString();
this.orderOn = orderOn;
this.self = self;
this.transitive = transitive;
}
public Object description()
{
return description;
}
public OrderOns orderOns() { return orderOn; }
public Phase phase() { return phase; }
public Modifiers self() { return self; }
public Modifiers transitive() { return transitive; }
public boolean is(Modifier modifier)
{
return self.contents.contains(modifier);
}
public void inherit(Modifiers add)
{
if (add != NONE)
add(add, add);
}
public void add(Modifiers self, Modifiers children)
{
this.self = this.self.with(self);
this.transitive = this.transitive.with(children);
}
public boolean isStarted()
{
return phase.compareTo(STARTED) >= 0;
}
public boolean isFinished()
{
return phase.compareTo(FINISHED) >= 0;
}
public boolean isCancelled()
{
return phase.compareTo(CANCELLED) >= 0;
}
public boolean isInvalidated()
{
return phase.compareTo(INVALIDATED) >= 0;
}
public Action parent()
{
return parent;
}
public int childCount()
{
return childCount;
}
/**
* Main implementation of {@link #perform()}, that must be completed by an extending classes.
*
* Does not need to handle consequences, registration etc.
*
* @return the action consequences
*/
protected abstract ActionList performSimple();
/**
* Invokes {@link #performSimple} before invoking {@link #performed}.
*
* May be overridden by an extending classes that does not finish immediately (e.g, SimulatedAction).
*
* MUST handle consequences, registration etc by invoking performed() on its results before returning,
* to register children and record the action's state
*
* @return the action consequences
*/
protected ActionList performAndRegister()
{
return performed(performSimple(), true, true);
}
/**
* Invoke the action, and return its consequences, i.e. any follow up actions.
*/
public final ActionList perform()
{
Preconditions.checkState(!(is(RELIABLE) && is(Modifier.DROP)));
Throwable fail = safeForEach(listeners, ActionListener::before, this, is(Modifier.DROP) ? DROP : EXECUTE);
if (fail != null)
{
invalidate(false);
Throwables.maybeFail(fail);
}
if (DEBUG && parent != null && parent.registered.get(this) != CHILD) throw new AssertionError();
ActionList next = performAndRegister();
next.forEach(Action::setConsequence);
if (is(STRICT_CHILD_ORDER)) next.setStrictlySequentialOn(this);
else if (is(STRICT_CHILD_OF_PARENT_ORDER)) next.setStrictlySequentialOn(parent);
return next;
}
/**
* To be invoked on the results of {@link #performSimple()} by its implementations.
* We invite the implementation to invoke it so that it may control state either side of its invocation.
*
* {@link #register(ActionList)}'s the consequences, restores any old withheld actions,
* and updates this Action's internal state.
*
* @return the provided actions, minus any withheld
*/
protected ActionList performed(ActionList consequences, boolean isStart, boolean isFinish)
{
assert isStarted() != isStart;
assert !isFinished();
consequences = register(consequences);
assert !consequences.anyMatch(c -> c.is(WITHHOLD));
if (isFinish) finishedSelf();
else if (isStart) phase = STARTED;
return restoreWithheld(consequences);
}
/**
* Similar to cancel() but invoked under abnormal termination
*/
public void invalidate()
{
invalidate(INVALIDATED);
}
/**
* To be invoked when this action has become redundant.
* - Marks itself invalidated
* - Notifies its listeners (which may remove it from any ordered sequences in the ActionSchedule)
* - If present, clears itself directly from:
* - its parent's withholding space
* - the schedule's priority queue
*/
public void cancel()
{
assert !isStarted();
invalidate(CANCELLED);
}
private void invalidate(Phase advanceTo)
{
if (phase.compareTo(CANCELLED) >= 0)
return;
advanceTo(advanceTo);
Throwable fail = safeForEach(listeners, ActionListener::before, this, INVALIDATE);
fail = Throwables.merge(fail, safeInvalidate(phase == CANCELLED));
invalidate(phase == CANCELLED);
finishedSelf();
Throwables.maybeFail(fail);
}
protected Throwable safeInvalidate(boolean isCancellation)
{
return null;
}
private void invalidate(boolean isCancellation)
{
if (parent != null && parent.withheld != null && is(WITHHOLD))
{
if (parent.withheld.remove(this))
parent.cleanupWithheld();
}
if (scheduledIndex >= 0) scheduledIn.remove(this);
if (savedIndex >= 0) savedIn.remove(this);
if (ordered != null) ordered.invalidate(isCancellation);
}
/**
* Register consequences of this action, i.e.:
* - attach a scheduler to them for prioritising when they are permitted to execute
* - pass them to any listeners as consequences
* - count them as children, and mark ourselves as parent, so that we may track transitive completion
* - withhold any actions that are so marked, to be {@link #restoreWithheld}d once we have transitively completed
* all non-withheld actions.
*/
protected ActionList register(ActionList consequences)
{
assert !isFinished();
if (consequences.isEmpty())
return consequences;
scheduler.attachTo(consequences);
Throwable fail = safeForEach(listeners, ActionListener::consequences, consequences);
if (fail != null)
{
invalidate(false);
Throwables.merge(fail, consequences.safeForEach(Action::invalidate));
Throwables.maybeFail(fail);
}
boolean isParentPseudoOrphan = is(PSEUDO_ORPHAN);
boolean withheld = false;
for (int i = 0 ; i < consequences.size() ; ++i)
{
Action child = consequences.get(i);
if (child.is(ORPHAN))
{
if (parent != null && child.is(ORPHAN_TO_GRANDPARENT))
{
++parent.childCount;
parent.registerChild(child);
}
else if (child.is(PSEUDO_ORPHAN))
{
child.inherit(transitive);
registerPseudoOrphan(child);
assert !child.is(WITHHOLD);
}
}
else
{
Action parent;
if (isParentPseudoOrphan && pseudoParent != null && pseudoParent.childCount > 0)
parent = pseudoParent;
else
parent = this;
child.inherit(parent.transitive);
if (child.is(WITHHOLD))
{
// this could be supported in principle by applying the ordering here, but it would be
// some work to ensure it doesn't lead to deadlocks so for now just assert we don't use it
Preconditions.checkState(!parent.is(STRICT_CHILD_ORDER) && !parent.is(STRICT_CHILD_OF_PARENT_ORDER));
withheld = true;
parent.addWithheld(child);
}
parent.registerChild(child);
parent.childCount++;
}
}
if (!withheld)
return consequences;
return consequences.filter(child -> !child.is(WITHHOLD));
}
// setup the child relationship, but do not update childCount
private void registerChild(Action child)
{
assert child.parent == null;
child.parent = this;
registerChildOrigin(child);
if (DEBUG && !register(child, CHILD)) throw new AssertionError();
}
private void registerPseudoOrphan(Action child)
{
assert child.parent == null;
assert child.pseudoParent == null;
child.pseudoParent = this;
registerChildOrigin(child);
}
private void registerChildOrigin(Action child)
{
if (is(Modifier.DISPLAY_ORIGIN)) child.origin = this;
else if (origin != this) child.origin = origin;
}
private boolean register(Object object, RegisteredType type)
{
RegisteredType prev = registered.putIfAbsent(object, type);
if (prev != null && prev != type)
throw new AssertionError();
return prev == null;
}
public void register(ActionListener listener)
{
if (register(listener, LISTENER))
listeners = append(listeners, listener);
}
private boolean deregister(Object object, RegisteredType type)
{
return registered.remove(object, type);
}
public void deregister(ActionListener listener)
{
if (deregister(listener, LISTENER))
listeners = remove(listeners, listener);
}
private void addWithheld(Action action)
{
if (withheld == null)
withheld = new DefaultPriorityQueue<>(Action::compareByPriority, 2);
action.advanceTo(WITHHELD);
action.saveIn(withheld);
}
/**
* Restore withheld (by ourselves or a parent) actions, when no other outstanding actions remain
*/
public ActionList restoreWithheld(ActionList consequences)
{
if (withheld != null && childCount == withheld.size())
{
Action next = withheld.poll();
cleanupWithheld();
consequences = consequences.andThen(next);
}
else if (childCount == 0 && parent != null)
{
Action cur = parent;
while (cur.childCount == 0 && cur.parent != null)
cur = cur.parent;
consequences = cur.restoreWithheld(consequences);
}
return consequences;
}
private void cleanupWithheld()
{
Action cur = this;
if (cur.withheld.isEmpty())
cur.withheld = null;
}
/**
* Invoked once we finish executing ourselves. Typically this occurs immediately after invocation,
* but for SimulatedAction it occurs only once the thread terminates its execution.
*
* In practice this is entirely determined by the {@code isFinished} parameter supplied to
* {@link #performed(ActionList, boolean, boolean)}.
*/
void finishedSelf()
{
if (phase.compareTo(CANCELLED) < 0)
advanceTo(FINISHED);
scheduler = null;
if (withheld != null)
{
Queue<Action> withheld = this.withheld;
this.withheld = null;
withheld.forEach(Action::cancel);
}
Throwable fail = safeForEach(listeners, ActionListener::after, this);
if (childCount == 0)
fail = Throwables.merge(fail, transitivelyFinished());
if (fail != null)
{
invalidate(false);
Throwables.maybeFail(fail);
}
}
/**
* Invoked once all of the consequences of this action, and of those actions (recursively) have completed.
*/
Throwable transitivelyFinished()
{
return transitivelyFinished(this);
}
static Throwable transitivelyFinished(Action cur)
{
Throwable fail = null;
while (true)
{
Action parent = cur.parent;
assert 0 == cur.childCount && cur.isFinished();
if (DEBUG && cur.registered.values().stream().anyMatch(t -> t == CHILD)) throw new AssertionError();
fail = Throwables.merge(fail, safeForEach(cur.listeners, ActionListener::transitivelyAfter, cur));
if (parent == null)
break;
if (DEBUG && CHILD != parent.registered.remove(cur)) throw new AssertionError();
if (--parent.childCount == 0 && parent.isFinished()) cur = parent;
else break;
}
return fail;
}
void orderOn(OrderOn orderOn)
{
this.orderOn = this.orderOn.with(orderOn);
}
void setupOrdering(ActionSchedule schedule)
{
if (orderOn.isOrdered())
{
ordered = orderOn.isStrict() ? new StrictlyOrdered(this, schedule) : new Ordered(this, schedule);
for (int i = 0, maxi = orderOn.size(); i < maxi ; ++i)
ordered.join(orderOn.get(i));
}
}
void advanceTo(Phase phase)
{
Preconditions.checkArgument(phase.compareTo(this.phase) > 0);
this.phase = phase;
}
void addTo(PriorityQueue<Action> byDeadline)
{
Preconditions.checkState(scheduledIndex < 0);
scheduledIn = byDeadline;
byDeadline.add(this);
}
void saveIn(PriorityQueue<Action> saveIn)
{
Preconditions.checkState(savedIndex < 0);
savedIn = saveIn;
saveIn.add(this);
}
void setScheduler(RunnableActionScheduler scheduler)
{
Preconditions.checkState(this.scheduler == null);
Preconditions.checkState(this.phase == NASCENT);
this.scheduler = scheduler;
}
void setConsequence()
{
advanceTo(CONSEQUENCE);
}
void schedule(SimulatedTime time, FutureActionScheduler future)
{
setPriority(time, scheduler.priority());
if (is(THREAD_SIGNAL) || deadline == 0)
{
long newDeadline = deadline == 0 ? time.nanoTime() : deadline;
newDeadline += future.schedulerDelayNanos();
deadline = newDeadline;
time.onTimeEvent("ResetDeadline", newDeadline);
}
}
public void setDeadline(SimulatedTime time, long deadlineNanos)
{
Preconditions.checkState(deadline == 0);
Preconditions.checkArgument(deadlineNanos >= deadline);
deadline = deadlineNanos;
time.onTimeEvent("SetDeadline", deadlineNanos);
}
public void setPriority(SimulatedTime time, double priority)
{
this.priority = priority;
time.onTimeEvent("SetPriority", Double.doubleToLongBits(priority));
}
public long deadline()
{
if (deadline < 0) throw new AssertionError();
return deadline;
}
public double priority()
{
return priority;
}
@Override
public int priorityQueueIndex(DefaultPriorityQueue<?> queue)
{
if (queue == scheduledIn) return scheduledIndex;
else if (queue == savedIn) return savedIndex;
else return -1;
}
@Override
public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i)
{
if (queue == scheduledIn) { scheduledIndex = i; if (i < 0) scheduledIn = null; }
else if (queue == savedIn) { savedIndex = i; if (i < 0) savedIn = null; }
else throw new IllegalStateException();
}
public int compareByDeadline(Action that)
{
return Long.compare(this.deadline, that.deadline);
}
public int compareByPriority(Action that)
{
return Double.compare(this.priority, that.priority);
}
private String describeModifiers()
{
StringBuilder builder = new StringBuilder("[");
for (Modifier modifier : self.contents)
{
if (modifier.displayId == 0)
continue;
if (!transitive.is(modifier)) builder.append(modifier.displayId);
else builder.append(Character.toUpperCase(modifier.displayId));
}
boolean hasTransitiveOnly = false;
for (Modifier modifier : transitive.contents)
{
if (modifier.displayId == 0)
continue;
if (!self.is(modifier))
{
if (!hasTransitiveOnly)
{
hasTransitiveOnly = true;
builder.append('(');
}
builder.append(modifier.displayId);
}
}
if (builder.length() == 1)
return "";
if (hasTransitiveOnly)
builder.append(')');
builder.append(']');
return builder.toString();
}
public String toString()
{
return describeModifiers() + description() + (origin != this ? " for " + origin : "");
}
public String toReconcileString()
{
return this + " at [" + deadline + ',' + priority + ']';
}
private static class StackElement
{
final Action action;
final Deque<Action> children;
private StackElement(Action action)
{
this.action = action;
this.children = new ArrayDeque<>(action.childCount);
for (Map.Entry<Object, RegisteredType> e : action.registered.entrySet())
{
if (e.getValue() == CHILD)
children.add((Action) e.getKey());
}
}
}
public String describeCurrentState()
{
StringBuilder sb = new StringBuilder();
Deque<StackElement> stack = new ArrayDeque<>();
appendCurrentState(sb);
stack.push(new StackElement(this));
while (!stack.isEmpty())
{
StackElement last = stack.peek();
if (last.children.isEmpty())
{
stack.pop();
}
else
{
Action child = last.children.pop();
sb.append('\n');
appendPrefix(stack.size(), sb);
child.appendCurrentState(sb);
stack.push(new StackElement(child));
}
}
return sb.toString();
}
private static void appendPrefix(int count, StringBuilder sb)
{
while (--count >= 0)
sb.append(" |");
}
private void appendCurrentState(StringBuilder sb)
{
if (!isStarted()) sb.append("NOT_STARTED ");
else if (!isFinished()) sb.append("NOT_FINISHED ");
if (childCount > 0)
{
sb.append('(');
sb.append(childCount);
sb.append(") ");
}
if (orderOn.isOrdered())
{
sb.append(orderOn);
sb.append(": ");
}
sb.append(description());
}
}