blob: 895053b13dea5e37c94443393d2019534780a94f [file] [log] [blame]
package accord.verify;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.stream.IntStream;
/**
* Nomenclature:
* register: the values associated with a given key
* step: a logical point in the causal sequence of events for a register
* step index: the index of a step; since we observe a strictly growing sequence this translates directly to
* the length of an observed sequence for a key. This imposes a total causal order for a given register
* (sequences [], [1], [1,2] have step indexes of 0, 1 and 2 respectively, with 2 necessarily happening after 1 and 0)
* predecessor: a step for a referent key that must occur before the referring step and key.
* two kinds: 1) those values for keys [B,C..) at step index i for a read of key A, precede key A's step index i +1
* 2) those values for keys [B,C..) at step index i for a write of key A, precede key A's step index i
* max predecessor: the maximum predecessor that may be reached via any predecessor relation
*
* Ensure there are no cycles in the implied list of predecessors, i.e. that we have a serializable order.
* That is, we maintain links to the maximum predecessor step for each key, at each step for each key.
* In combination with a linearizability verifier for each register/partition, we verify strict-serializability.
*
* TODO: find and report a path when we encounter a violation
*/
public class SerializabilityVerifier
{
/**
* A link to the maximum predecessor node for a given key reachable from the transitive closure of predecessor
* relations from a given register's observation (i.e. for a given sequence observed for a given key).
*
* A predecessor is an absolute happens-before relationship. This is created either:
* 1) by witnessing some read for key A coincident with a write for key B,
* therefore the write for key B happened strictly after the write for key A; or
* 2) any value for key A witnessed alongside a step index i for key B happens before i+1 for key B.
*
* For every observation step index i for key A, we have an outgoing MaxPredecessor link to every key.
* This object both maintains the current maximum predecessor step index reachable via the transitive closure
* of happens-before relationships, but represents a back-link from that step index for the referred-to key,
* so that when its own predecessor memoized maximum predecessor values are updated we can propagate them here.
*
* In essence, each node in the happens-before graph maintains a link to every possible frontier of its transitive
* closure in the graph, so that each time that frontier is updated the internal nodes that reference it are updated
* to the new frontier. This ensures ~computationally optimal maintenance of this transitive closure at the expense
* of quadratic memory utilisation, but for small numbers of unique keys this remains quite manageable (i.e. steps*keys^2)
*
* This graph can be interpreted quite simply: if any step index for a key can reach itself (or a successor) via
* the transitive closure of happens-before relations then there is a serializability violation.
*/
static class MaxPredecessor implements Comparable<MaxPredecessor>
{
MaxPredecessor prev = this, next = this;
final int ofKey;
final int ofStepIndex;
// TODO: we probably don't need this field, as it's implied by the node we point to, that we have when we enqueue refresh
final int predecessorKey;
int predecessorStepIndex;
MaxPredecessor(int ofKey, int ofStepIndex, int predecessorKey)
{
this.ofKey = ofKey;
this.ofStepIndex = ofStepIndex;
this.predecessorKey = predecessorKey;
this.predecessorStepIndex = -1;
}
@Override
public int compareTo(MaxPredecessor that)
{
if (this.ofStepIndex != that.ofStepIndex) return Integer.compare(this.ofStepIndex, that.ofStepIndex);
else return Integer.compare(this.ofKey, that.ofKey);
}
/**
* Unlink {@code push} from any list in which it presently resides, and link it to this one
*/
void push(MaxPredecessor push)
{
MaxPredecessor head = this;
// unlink push from its current list
push.next.prev = push.prev;
push.prev.next = push.next;
// link push to this list
push.next = head.next;
push.prev = head;
head.next = push;
push.next.prev = push;
}
/**
* Apply {@code forEach} to each element in this list
*/
void forEach(Consumer<MaxPredecessor> forEach)
{
MaxPredecessor next = this.next;
while (next != this)
{
forEach.accept(next);
next = next.next;
}
}
@Override
public String toString()
{
return Integer.toString(predecessorStepIndex);
}
}
/**
* Represents the graph state for a step for a single key, and maintains backwards references to all
* internal nodes of the graph whose maximum predecessor for this key points to this step.
* When this step is updated, we queue all of these internal nodes to update their own max predecessors.
*/
static class Step extends MaxPredecessor
{
/**
* the maximum _step_ of the corresponding key's sequence that was witnessed alongside this step for this key
*/
final int[] maxPeers;
/**
* The maximum _step_ of the corresponding key's sequence that was witnessed by any transitive predecessor of this key for this step.
* That is, if we look at the directly preceding step for this key (which must by definition precede this step) and explore all of
* its predecessors in the same manner, what is the highest step we can reach for each key.
*/
final MaxPredecessor[] maxPredecessors;
/**
* The next instantiated sequence's observation.
* This may not be stepIndex+1, if we have not witnessed stepIndex+1 directly.
* i.e. if we have witnessed [0] and [0,1,2] then 0's successor will be 2.
* If we later witness [0,1], 0's successor will be updated to 1, whose successor will be 2.
*/
Step successor;
Step(int key, int stepIndex, int keyCount)
{
super(key, stepIndex, key);
this.maxPeers = new int[keyCount];
Arrays.fill(maxPeers, -1);
this.maxPredecessors = new MaxPredecessor[keyCount];
this.predecessorStepIndex = stepIndex - 1;
}
/**
* The maxPredecessor for {@code key}, instantiating it if none currently exists
*/
MaxPredecessor maxPredecessor(int key)
{
if (maxPredecessors[key] == null)
maxPredecessors[key] = new MaxPredecessor(ofKey, ofStepIndex, key);
return maxPredecessors[key];
}
@Override
public String toString()
{
return "{" + Arrays.toString(maxPeers) + ", " + Arrays.toString(maxPredecessors) + '}';
}
}
/**
* The history of observations for a given key, or the set of nodes in the graph of observations for this key.
*
* TODO: extend LinearizabilityVerifier
*/
class Register
{
final int key;
// the total order sequence for this register
int[] sequence = new int[0];
Step[] steps = new Step[1];
/**
* Any write value we don't know the step index for because we did not perform a coincident read;
* we wait until we witness a read containing the
*/
Map<Integer, List<Deferred>> deferred = new HashMap<>();
Register(int key)
{
this.key = key;
}
private void updateSequence(int[] sequence, int maybeWrite)
{
for (int i = 0, max = Math.min(sequence.length, this.sequence.length) ; i < max ; ++i)
{
if (sequence[i] != this.sequence[i])
throw new HistoryViolation(key, "Inconsistent sequences: " + Arrays.toString(this.sequence) + " vs " + Arrays.toString(sequence));
}
if (this.sequence.length > sequence.length)
{
if (maybeWrite >= 0 && maybeWrite != this.sequence[sequence.length])
throw new HistoryViolation(key, "Inconsistent sequences: " + Arrays.toString(this.sequence) + " vs " + Arrays.toString(sequence) + "+" + maybeWrite);
}
else
{
if (maybeWrite >= 0)
{
sequence = Arrays.copyOf(sequence, sequence.length + 1);
sequence[sequence.length - 1] = maybeWrite;
}
if (sequence.length > this.sequence.length)
{
// process any sequences deferred because we didnt know what step they occurred on
for (int i = this.sequence.length ; i < sequence.length ; ++i)
{
List<Deferred> deferreds = deferred.remove(i);
if (deferreds != null)
{
for (Deferred deferred : deferreds)
{
deferred.update(key, i + 1);
deferred.process(SerializabilityVerifier.this);
}
}
}
this.sequence = sequence;
}
}
}
Step step(int step)
{
if (steps.length <= step)
steps = Arrays.copyOf(steps, Math.max(step + 1, steps.length * 2));
if (steps[step] == null)
{
steps[step] = new Step(key, step, keyCount);
int i = step;
while (--i >= 0 && steps[i] == null) {}
if (i >= 0)
{
steps[step].successor = steps[i].successor;
steps[i].successor = steps[step];
updatePredecessors(steps[step], steps[i], true);
}
else
{
i = step;
while (++i < steps.length && steps[i] == null) {}
if (i < steps.length)
{
steps[step].successor = steps[i];
updatePredecessors(steps[i], steps[step], true);
}
}
}
return steps[step];
}
private void updatePeersAndPredecessors(int[] newPeers, int[][] reads, int[] writes)
{
int stepIndex = newPeers[key];
Step step = step(stepIndex);
boolean updated = updatePeers(step, newPeers);
updated |= updatePredecessorsOfWrite(step, reads, writes);
if (updated)
onChange(step);
}
private boolean updatePeers(Step step, int[] newPeers)
{
boolean updated = false;
for (int key = 0 ; key < keyCount ; ++key)
{
int newPeer = newPeers[key];
int maxPeer = step.maxPeers[key];
if (newPeer > maxPeer)
{
updated = true;
step.maxPeers[key] = newPeers[key];
}
}
return updated;
}
private void updatePredecessors(Step updateStep, Step fromStep, boolean includeSelf)
{
boolean updated = false;
for (int key = 0 ; key < keyCount ; ++key)
{
MaxPredecessor newPredecessor = fromStep.maxPredecessors[key];
int selfPredecessorStepIndex = includeSelf ? fromStep.maxPeers[key] : -1;
int newPredecessorStepIndex = newPredecessor == null ? selfPredecessorStepIndex
: Math.max(selfPredecessorStepIndex, newPredecessor.predecessorStepIndex);
MaxPredecessor maxPredecessor;
if (newPredecessorStepIndex >= 0 && newPredecessorStepIndex > (maxPredecessor = updateStep.maxPredecessor(key)).predecessorStepIndex)
{
maxPredecessor.predecessorStepIndex = newPredecessorStepIndex;
registers[key].step(newPredecessorStepIndex).push(maxPredecessor);
updated = true;
}
}
if (updated)
onChange(updateStep);
}
/**
* keys that are written as part of the transaction occur with the transaction,
* so those that are only read must precede them
*/
private boolean updatePredecessorsOfWrite(Step step, int[][] reads, int[] writes)
{
if (writes[key] < 0)
return false;
boolean updated = false;
for (int key = 0 ; key < writes.length ; ++key)
{
if (reads[key] == null)
continue;
int newPredecessorStepIndex = reads[key].length;
MaxPredecessor maxPredecessor;
if (newPredecessorStepIndex > (maxPredecessor = step.maxPredecessor(key)).predecessorStepIndex)
{
maxPredecessor.predecessorStepIndex = newPredecessorStepIndex;
Step fromStep = registers[key].step(newPredecessorStepIndex);
fromStep.push(maxPredecessor);
updatePredecessors(step, fromStep, false);
updated = true;
}
}
return updated;
}
void onChange(Step step)
{
if (step.maxPredecessor(key).predecessorStepIndex >= step.ofStepIndex)
throw new HistoryViolation(key, "Cycle detected on key " + key + ", step " + step.ofStepIndex + " " + Arrays.toString(Arrays.copyOf(sequence, step.ofStepIndex)));
step.forEach(refresh::add);
if (step.successor != null)
updatePredecessors(step.successor, step, true);
}
void registerDeferred(int unknownStepWriteValue, Deferred deferred)
{
this.deferred.computeIfAbsent(unknownStepWriteValue, ignore -> new ArrayList<>())
.add(deferred);
}
@Override
public String toString()
{
return Arrays.toString(steps);
}
}
// writes without a corresponding read don't know their position in the total order for the register
// once this is known we can process the implied predecessor graph for them
private static class Deferred
{
final int[] newPeers;
final int[][] reads;
final int[] writes;
Deferred(int[] newPeers, int[][] reads, int[] writes)
{
this.newPeers = newPeers;
this.reads = reads;
this.writes = writes;
}
void update(int key, int step)
{
newPeers[key] = step;
}
void process(SerializabilityVerifier verifier)
{
for (int k = 0; k < newPeers.length ; ++k)
{
if (newPeers[k] >= 0)
verifier.registers[k].updatePeersAndPredecessors(newPeers, reads, writes);
}
}
}
final int keyCount;
final Register[] registers;
// TODO: use another intrusive list or intrusive tree
final TreeSet<MaxPredecessor> refresh = new TreeSet<>();
// [key]->the sequence returned by any read performed in this transaction
final int[][] bufReads;
// [key]->the value of any write performed in this transaction
final int[] bufWrites;
// [key]->the step witnessed with the current transaction (if known)
final int[] bufNewPeerSteps;
public SerializabilityVerifier(int keyCount)
{
this.keyCount = keyCount;
this.bufNewPeerSteps = new int[keyCount];
this.bufWrites = new int[keyCount];
this.bufReads = new int[keyCount][];
this.registers = IntStream.range(0, keyCount)
.mapToObj(Register::new)
.toArray(Register[]::new);
}
/**
* Start a new set of coincident observations
*/
public void begin()
{
Arrays.fill(bufWrites, -1);
Arrays.fill(bufNewPeerSteps, -1);
Arrays.fill(bufReads, null);
}
/**
* Buffer a new read observation.
*
* Note that this should EXCLUDE any witnessed write for this key.
* This is to simplify the creation of direct happens-before edges with observations for other keys
* that are implied by the witnessing of a write (and is also marginally more efficient).
*/
public void witnessRead(int key, int[] sequence)
{
if (bufReads[key] != null)
throw new IllegalStateException("Can buffer only one read observation for each key");
bufReads[key] = sequence;
// if we have a write, then for causality sequence is implicitly longer by one to include the write
bufNewPeerSteps[key] = bufWrites[key] >= 0 ? sequence.length + 1 : sequence.length;
}
/**
* Buffer a new read observation
*/
public void witnessWrite(int key, int id)
{
if (bufWrites[key] >= 0)
throw new IllegalStateException("Can buffer only one write observation for each key");
bufWrites[key] = id;
if (bufReads[key] != null)
bufNewPeerSteps[key] = bufReads[key].length;
}
/**
* Apply the pending coincident observations to the verification graph
*/
public void apply()
{
for (int k = 0; k < bufReads.length ; ++k)
{
if (bufWrites[k] >= 0 && bufReads[k] == null)
{
int i = Arrays.binarySearch(registers[k].sequence, bufWrites[k]);
if (i >= 0)
bufNewPeerSteps[k] = i + 1;
}
}
Deferred deferred = null;
for (int k = 0; k < bufReads.length ; ++k)
{
if (bufReads[k] != null)
registers[k].updateSequence(bufReads[k], bufWrites[k]);
if (bufNewPeerSteps[k] >= 0)
{
registers[k].updatePeersAndPredecessors(bufNewPeerSteps, bufReads, bufWrites);
}
else if (bufWrites[k] >= 0)
{
if (deferred == null)
deferred = new SerializabilityVerifier.Deferred(bufNewPeerSteps.clone(), bufReads.clone(), bufWrites.clone());
registers[k].registerDeferred(bufWrites[k], deferred);
}
}
refreshTransitive();
}
private void refreshTransitive()
{
for (MaxPredecessor next = refresh.pollFirst(); next != null; next = refresh.pollFirst())
{
registers[next.ofKey].updatePredecessors(registers[next.ofKey].step(next.ofStepIndex),
registers[next.predecessorKey].step(next.predecessorStepIndex), false);
}
}
}