blob: 8ae2b895c37c8866a53ab267054ea84271280626 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.verify;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.stream.Collectors.joining;
/**
* Nomenclature:
* register: the values associated with a given key
* step: a logical point in the causal sequence of events for a register
* step index: the index of a step; since we observe a strictly growing sequence this translates directly to
* the length of an observed sequence for a key. This imposes a total causal order for a given register
* (sequences [], [1], [1,2] have step indexes of 0, 1 and 2 respectively, with 2 necessarily happening after 1 and 0)
* predecessor: a step for a referent key that must occur before the referring step and key.
* two kinds: 1) those values for keys [B,C..) at step index i for a read of key A, precede key A's step index i +1
* 2) those values for keys [B,C..) at step index i for a write of key A, precede key A's step index i
* max predecessor: the maximum predecessor that may be reached via any predecessor relation
* <p>
* Ensure there are no timestamp cycles in the implied list of predecessors, i.e. that we have a strict serializable order.
* That is, we maintain links to the maximum predecessor step for each key, at each step for each key, and see if we can
* find a path of predecessors that would witness us.
* <p>
* TODO (low priority): find and report a path when we encounter a violation
*/
public class StrictSerializabilityVerifier
{
private static final Logger logger = LoggerFactory.getLogger(StrictSerializabilityVerifier.class);
/**
* A link to the maximum predecessor node for a given key reachable from the transitive closure of predecessor
* relations from a given register's observation (i.e. for a given sequence observed for a given key).
* <p>
* A predecessor is an absolute happens-before relationship. This is created either:
* 1) by witnessing some read for key A coincident with a write for key B,
* therefore the write for key B happened strictly after the write for key A; or
* 2) any value for key A witnessed alongside a step index i for key B happens before i+1 for key B.
* <p>
* For every observation step index i for key A, we have an outgoing MaxPredecessor link to every key.
* This object both maintains the current maximum predecessor step index reachable via the transitive closure
* of happens-before relationships, but represents a back-link from that step index for the referred-to key,
* so that when its own predecessor memoized maximum predecessor values are updated we can propagate them here.
* <p>
* In essence, each node in the happens-before graph maintains a link to every possible frontier of its transitive
* closure in the graph, so that each time that frontier is updated the internal nodes that reference it are updated
* to the new frontier. This ensures ~computationally optimal maintenance of this transitive closure at the expense
* of quadratic memory utilisation, but for small numbers of unique keys this remains quite manageable (i.e. steps*keys^2)
* <p>
* This graph can be interpreted quite simply: if any step index for a key can reach itself (or a successor) via
* the transitive closure of happens-before relations then there is a serializability violation.
*/
static class MaxPredecessor implements Comparable<MaxPredecessor>
{
MaxPredecessor prev = this, next = this;
// the key we are tracking predecessors for
final int ofKey;
// the step index we are tracking predecessors for
Step ofStep;
// TODO (low priority): we probably don't need this field, as it's implied by the node we point to, that we have when we enqueue refresh
// the key we are tracking the maximum predecessor for
final int predecessorKey;
// the step of the predecessor we are tracking with this node
Step predecessorStep;
MaxPredecessor(int ofKey, Step ofStep, int predecessorKey)
{
this.ofKey = ofKey;
this.ofStep = ofStep;
this.predecessorKey = predecessorKey;
}
@Override
public int compareTo(MaxPredecessor that)
{
if (this.ofStep != that.ofStep) return Integer.compare(this.ofStep.ofStepIndex, that.ofStep.ofStepIndex);
else return Integer.compare(this.ofKey, that.ofKey);
}
/**
* Unlink {@code push} from any list in which it presently resides, and link it to this one
*/
void push(MaxPredecessor push)
{
MaxPredecessor head = this;
// unlink push from its current list
push.next.prev = push.prev;
push.prev.next = push.next;
// link push to this list
push.next = head.next;
push.prev = head;
head.next = push;
push.next.prev = push;
}
/**
* Apply {@code forEach} to each element in this list
*/
void forEach(Consumer<MaxPredecessor> forEach)
{
MaxPredecessor next = this.next;
while (next != this)
{
forEach.accept(next);
next = next.next;
}
}
@Override
public String toString()
{
return predecessorStep == null ? "" : "" + predecessorStep.ofStepIndex;
}
}
static class UnknownStepPredecessor extends MaxPredecessor
{
final UnknownStepHolder holder;
UnknownStepPredecessor(Step successor, UnknownStepHolder holder)
{
super(successor.ofKey, successor, holder.step.ofKey);
this.holder = holder;
this.predecessorStep = holder.step;
}
}
class UnknownStepHolder implements Runnable
{
final List<Step> peers = new ArrayList<>();
final int writeValue;
final int start;
final int end;
final Step step;
UnknownStepHolder(int writeValue, int start, int end, Step step)
{
this.start = start;
this.end = end;
this.writeValue = writeValue;
this.step = step;
step.onChange = this;
}
void discoveredStepIndex(int stepIndex)
{
Register register = registers[step.ofKey];
step.ofStepIndex = stepIndex;
step.onChange = null;
register.insert(step);
for (Step peer : peers)
{
if (peer.maxPeers[step.ofKey] < stepIndex)
{
peer.unknownStepPeers.remove(this);
if (peer.unknownStepPeers.isEmpty())
peer.unknownStepPeers = null;
peer.maxPeers[step.ofKey] = stepIndex;
registers[peer.ofKey].onChange(peer);
}
}
register.onChange(step);
}
@Override
public String toString()
{
return "{key:" + step.ofKey + ", value:" + writeValue + '}';
}
@Override
public void run()
{
for (Step peer : peers)
registers[peer.ofKey].onChange(peer);
}
}
/**
* Represents the graph state for a step for a single key, and maintains backwards references to all
* internal nodes of the graph whose maximum predecessor for this key points to this step.
* When this step is updated, we queue all of these internal nodes to update their own max predecessors.
*/
static class Step extends MaxPredecessor
{
/**
* the maximum _step_ of the corresponding key's sequence that was witnessed alongside this step for this key
*/
final int[] maxPeers;
/**
* The maximum _step_ of the corresponding key's sequence that was witnessed by any transitive predecessor of this key for this step.
* That is, if we look at the directly preceding step for this key (which must by definition precede this step) and explore all of
* its predecessors in the same manner, what is the highest step we can reach for each key.
*/
final MaxPredecessor[] maxPredecessors;
// TODO (low priority): cleanup
List<UnknownStepHolder> unknownStepPeers;
Map<Step, UnknownStepPredecessor> unknownStepPredecessors;
Runnable onChange;
final int writeValue;
int ofStepIndex;
/**
* The next instantiated sequence's observation.
* This may not be stepIndex+1, if we have not witnessed stepIndex+1 directly.
* i.e. if we have witnessed [0] and [0,1,2] then 0's successor will be 2.
* If we later witness [0,1], 0's successor will be updated to 1, whose successor will be 2.
*/
Step successor;
// the highest point we MUST have witnessed this step
int witnessedUntil = Integer.MIN_VALUE;
// the highest possible time the write for this step could have occurred
int writtenBefore = Integer.MAX_VALUE;
// the lowest possible time the write for this step could have occurred
int writtenAfter = Integer.MIN_VALUE;
int maxPredecessorWrittenAfter = Integer.MIN_VALUE;
Step(int key, int stepIndex, int keyCount, int writeValue)
{
super(key, null, key);
this.writeValue = writeValue;
this.ofStep = this;
this.ofStepIndex = stepIndex;
this.maxPeers = new int[keyCount];
Arrays.fill(maxPeers, -1);
this.maxPredecessors = new MaxPredecessor[keyCount];
}
void reset()
{
Arrays.fill(maxPeers, -1);
Arrays.fill(maxPredecessors, null);
maxPredecessorWrittenAfter = Integer.MIN_VALUE;
witnessedUntil = Integer.MIN_VALUE;
// the highest possible time the write for this step could have occurred
writtenBefore = Integer.MAX_VALUE;
// the lowest possible time the write for this step could have occurred
writtenAfter = Integer.MIN_VALUE;
}
boolean witnessedBetween(int start, int end, boolean isWrite)
{
boolean updated = false;
if (start > witnessedUntil)
{
witnessedUntil = start;
updated = true;
}
if (end < writtenBefore)
{
updated = true;
writtenBefore = end;
}
if (isWrite)
{
if (start > writtenAfter)
{
updated = true;
writtenAfter = start;
// TODO (low priority): double check this will trigger an update of maxPredecessorX properties on each node with us as a maxPredecessor
}
if (writtenAfter > writtenBefore)
throw new HistoryViolation(ofKey, "Write operation time conflicts with earlier read");
}
return updated;
}
/**
* The maxPredecessor for {@code key}, instantiating it if none currently exists
*/
MaxPredecessor maxPredecessor(int key)
{
if (maxPredecessors[key] == null)
maxPredecessors[key] = new MaxPredecessor(ofKey, this, key);
return maxPredecessors[key];
}
void setSuccessor(Step successor)
{
this.successor = successor;
successor.predecessorStep = this;
}
boolean updatePeers(int[] newPeerSteps, UnknownStepHolder[] newBlindWrites)
{
boolean updated = false;
for (int key = 0 ; key < newPeerSteps.length ; ++key)
{
int newPeer = newPeerSteps[key];
int maxPeer = maxPeers[key];
if (newPeer > maxPeer)
{
updated = true;
maxPeers[key] = newPeerSteps[key];
}
if (newBlindWrites != null)
{
UnknownStepHolder unknownStep = newBlindWrites[key];
if (unknownStep != null && unknownStep.step != this)
{
if (unknownStepPeers == null)
unknownStepPeers = new ArrayList<>();
unknownStepPeers.add(unknownStep);
unknownStep.peers.add(this);
}
}
}
return updated;
}
/**
* keys that are written as part of the transaction occur with the transaction,
* so those that are only read must precede them
*/
boolean updatePredecessorsOfWrite(int[][] reads, int[] writes, StrictSerializabilityVerifier verifier)
{
if (writes[ofKey] < 0)
return false;
boolean updated = false;
for (int key = 0 ; key < reads.length ; ++key)
{
if (reads[key] == null)
continue;
int newPredecessorStepIndex = reads[key].length;
MaxPredecessor maxPredecessor = maxPredecessor(key);
if (maxPredecessor.predecessorStep == null || newPredecessorStepIndex > maxPredecessor.predecessorStep.ofStepIndex)
{
Step newPredecessorStep = verifier.registers[key].step(newPredecessorStepIndex);
maxPredecessor.predecessorStep = newPredecessorStep;
if (newPredecessorStep.writtenAfter > maxPredecessorWrittenAfter)
maxPredecessorWrittenAfter = newPredecessorStep.writtenAfter;
newPredecessorStep.push(maxPredecessor);
receiveKnowledgePhasedPredecessors(newPredecessorStep, verifier);
updated = true;
}
}
return updated;
}
boolean receiveKnowledgePhasedPredecessors(Step propagate, StrictSerializabilityVerifier verifier)
{
if (unknownStepPredecessors != null && propagate.ofStepIndex < Integer.MAX_VALUE && unknownStepPredecessors.containsKey(propagate))
{
unknownStepPredecessors.remove(propagate);
if (unknownStepPredecessors.isEmpty())
unknownStepPredecessors = null;
}
boolean updated = false;
for (int key = 0 ; key < maxPredecessors.length ; ++key)
{
MaxPredecessor newMaxPredecessor = propagate.maxPredecessors[key];
// we use maxPeers here because anything we witness coincident
// with a direct predecessor of this sequence must have preceded us
Step selfPredecessor = propagate.ofKey == ofKey && propagate.maxPeers[key] >= 0
? verifier.registers[key].step(propagate.maxPeers[key]) : null;
if ((newMaxPredecessor == null || newMaxPredecessor.predecessorStep == null) && selfPredecessor == null)
continue;
Step newPredecessor;
if (newMaxPredecessor == null) newPredecessor = selfPredecessor;
else if (newMaxPredecessor.predecessorStep == null) newPredecessor = selfPredecessor;
else if (selfPredecessor == null) newPredecessor = newMaxPredecessor.predecessorStep;
else if (selfPredecessor.ofStepIndex > newMaxPredecessor.predecessorStep.ofStepIndex) newPredecessor = selfPredecessor;
else newPredecessor = newMaxPredecessor.predecessorStep;
MaxPredecessor maxPredecessor = maxPredecessor(key);
Step oldPredecessor = maxPredecessor.predecessorStep;
if (oldPredecessor == null || oldPredecessor.ofStepIndex < newPredecessor.ofStepIndex)
{
maxPredecessor.predecessorStep = newPredecessor;
if (newPredecessor.writtenAfter > maxPredecessorWrittenAfter)
maxPredecessorWrittenAfter = newPredecessor.writtenAfter;
newPredecessor.push(maxPredecessor);
updated = true;
}
}
if (propagate.ofKey == ofKey && propagate.unknownStepPeers != null)
{
for (UnknownStepHolder unknownStep : propagate.unknownStepPeers)
updated |= receiveUnknownStepPredecessor(unknownStep, verifier);
}
if (propagate.unknownStepPredecessors != null)
{
for (UnknownStepPredecessor unknownStepPredecessor : propagate.unknownStepPredecessors.values())
updated |= receiveUnknownStepPredecessor(unknownStepPredecessor.holder, verifier);
}
return updated;
}
boolean receiveUnknownStepPredecessor(UnknownStepHolder unknownStep, StrictSerializabilityVerifier verifier)
{
if (unknownStep.step == this)
throw new HistoryViolation(ofKey, "Unknown write step on key " + ofKey + " with value " + unknownStep.writeValue + " is reachable from its happens-before relations");
if (unknownStepPredecessors == null)
unknownStepPredecessors = new LinkedHashMap<>();
if (unknownStepPredecessors.containsKey(unknownStep.step))
return false;
UnknownStepPredecessor predecessor = new UnknownStepPredecessor(this, unknownStep);
unknownStep.step.push(predecessor);
unknownStepPredecessors.put(unknownStep.step, predecessor);
receiveKnowledgePhasedPredecessors(unknownStep.step, verifier);
return true;
}
@Override
public String toString()
{
StringBuilder builder = new StringBuilder("{key: " + ofKey);
if (ofStepIndex < 0)
builder.append(", value: ").append(writeValue);
builder.append(", peers:").append(Arrays.toString(maxPeers))
.append(", preds:").append(Arrays.toString(maxPredecessors))
.append(unknownStepPeers != null ? ", peers?:" + unknownStepPeers : "")
.append(unknownStepPredecessors != null ? ", preds?:" + unknownStepPredecessors.values() : "")
.append("}");
return builder.toString();
}
}
// TODO (expected, testing): rethink this concept - should it be a Step? Does it provide any value? If not, it's more confusing.
class FutureWrites extends Step
{
/**
* Any write value we don't know the step index for because we did not perform a coincident read;
* we wait until we witness a read containing the
* <p>
* TODO: report a violation if we have witnessed a sequence missing any of these deferred operations
* that started after they finished
* <p>
* TODO: handle writes with unknown outcome
*/
final Map<Integer, UnknownStepHolder> byWriteValue = new HashMap<>();
final TreeMap<Integer, UnknownStepHolder> byTimestamp = new TreeMap<>();
FutureWrites(int key, int keyCount)
{
super(key, Integer.MAX_VALUE, keyCount, Integer.MIN_VALUE);
}
void newSequence(int[] oldSequence, int[] newSequence)
{
boolean updated = false;
for (int i = oldSequence.length ; i < newSequence.length ; ++i)
{
UnknownStepHolder unknownStep = byWriteValue.remove(newSequence[i]);
if (unknownStep != null)
{
updated = true;
unknownStep.discoveredStepIndex(i + 1);
byTimestamp.remove(unknownStep.end);
}
}
if (updated && byWriteValue.isEmpty())
reset();
else if (updated)
recompute();
}
@Override
boolean witnessedBetween(int start, int end, boolean isWrite)
{
return false;
}
@Override
boolean updatePeers(int[] newPeers, UnknownStepHolder[] unknownSteps)
{
return false;
}
@Override
boolean receiveKnowledgePhasedPredecessors(Step propagate, StrictSerializabilityVerifier verifier)
{
for (UnknownStepHolder holder : byWriteValue.values())
holder.step.receiveKnowledgePhasedPredecessors(propagate, verifier);
return super.receiveKnowledgePhasedPredecessors(propagate, verifier);
}
@Override
boolean updatePredecessorsOfWrite(int[][] reads, int[] writes, StrictSerializabilityVerifier verifier)
{
// TODO (required): this evidently wasn't carefully considered at the time, and removing it improves things
// but what if anything this trying to achieve?
// probably we DO want to update the predecessors of any NEW step. perhaps we also want
// and we probably want some additional unit tests around blind writes
// for (UnknownStepHolder holder : byWriteValue.values())
// holder.step.updatePredecessorsOfWrite(reads, writes, verifier);
// return super.updatePredecessorsOfWrite(reads, writes, verifier);
return false;
}
void recompute()
{
writtenBefore = Integer.MAX_VALUE;
writtenAfter = Integer.MIN_VALUE;
witnessedUntil = Integer.MIN_VALUE;
for (UnknownStepHolder deferred : byTimestamp.values())
witnessedBetween(deferred.start, deferred.end, true);
}
void register(UnknownStepHolder[] newBlindWrites, int start, int end)
{
UnknownStepHolder unknownStep = newBlindWrites[ofKey];
if (null != byWriteValue.putIfAbsent(unknownStep.writeValue, unknownStep))
throw new AssertionError();
if (null != byTimestamp.putIfAbsent(end, unknownStep))
throw new AssertionError();
// TODO (desired, testing): verify this propagation by unit test
newBlindWrites[ofKey].step.receiveKnowledgePhasedPredecessors(this, StrictSerializabilityVerifier.this);
}
public void checkForUnwitnessed(int start)
{
if (byTimestamp.lowerEntry(start) != null)
{
Collection<UnknownStepHolder> notWitnessed = byTimestamp.headMap(start, false).values();
throw new HistoryViolation(ofKey, "Writes not witnessed: " + notWitnessed);
}
}
public String toString()
{
return "FutureWrites:[" + byTimestamp.values().stream().map(s -> s.writeValue).map(Object::toString).collect(joining(",")) + ']';
}
}
/**
* The history of observations for a given key, or the set of nodes in the graph of observations for this key.
*/
class Register
{
final int key;
// the total order sequence for this register
int[] sequence = new int[0];
Step[] steps = new Step[1];
final FutureWrites futureWrites;
Register(int key)
{
this.key = key;
this.futureWrites = new FutureWrites(key, keyCount);
}
void print()
{
for (Map.Entry<Integer, UnknownStepHolder> e : futureWrites.byWriteValue.entrySet())
logger.error("{}: {} -> ({}, {}, {})", key, e.getKey(), e.getValue().writeValue, e.getValue().start, e.getValue().end);
}
private void updateSequence(int[] sequence, int maybeWrite)
{
for (int i = 0, max = Math.min(sequence.length, this.sequence.length) ; i < max ; ++i)
{
if (sequence[i] != this.sequence[i])
throw new HistoryViolation(key, "Inconsistent sequences on " + key + ": " + Arrays.toString(this.sequence) + " vs " + Arrays.toString(sequence));
}
if (this.sequence.length > sequence.length)
{
if (maybeWrite >= 0 && maybeWrite != this.sequence[sequence.length])
throw new HistoryViolation(key, "Inconsistent sequences on " + key + ": " + Arrays.toString(this.sequence) + " vs " + Arrays.toString(sequence) + "+" + maybeWrite);
}
else
{
if (maybeWrite >= 0)
{
if (IntStream.of(sequence).anyMatch(i -> i == maybeWrite))
throw new HistoryViolation(key, "Attempted to write " + maybeWrite + " which is already found in the seq; seq=" + Arrays.toString(sequence));
sequence = Arrays.copyOf(sequence, sequence.length + 1);
sequence[sequence.length - 1] = maybeWrite;
}
if (sequence.length > this.sequence.length)
{
futureWrites.newSequence(this.sequence, sequence);
this.sequence = sequence;
}
}
}
Step step(int step)
{
if (steps.length <= step)
steps = Arrays.copyOf(steps, Math.max(step + 1, steps.length * 2));
if (steps[step] == null)
insert(new Step(key, step, keyCount, step == 0 ? -1 : sequence[step - 1]));
return steps[step];
}
void insert(Step step)
{
int i = step.ofStepIndex;
if (i >= steps.length)
steps = Arrays.copyOf(steps, Math.max(i + 1, steps.length * 2));
steps[i] = step;
while (--i >= 0 && steps[i] == null) {}
if (i >= 0)
{
step.setSuccessor(steps[i].successor);
steps[i].setSuccessor(step);
propagateToDirectSuccessor(steps[i], step);
}
else
{
i = step.ofStepIndex;
while (++i < steps.length && steps[i] == null) {}
Step successor = i < steps.length ? steps[i] : futureWrites;
step.setSuccessor(successor);
propagateToDirectSuccessor(step, successor);
}
}
private void updatePeersAndPredecessors(int[] newPeerSteps, int[][] reads, int[] writes, int start, int end, UnknownStepHolder[] newBlindWrites)
{
Step step;
boolean updated;
if (newPeerSteps[key] >= 0)
{
step = step(newPeerSteps[key]);
}
else if (newBlindWrites != null && newBlindWrites[key] != null)
{
assert writes[key] >= 0;
futureWrites.register(newBlindWrites, start, end);
step = newBlindWrites[key].step;
}
else
{
throw new IllegalStateException();
}
updated = step.updatePeers(newPeerSteps, newBlindWrites);
updated |= step.updatePredecessorsOfWrite(reads, writes, StrictSerializabilityVerifier.this);
updated |= step.witnessedBetween(start, end, writes[key] >= 0);
if (updated)
onChange(step);
}
private void propagateToDirectSuccessor(Step predecessor, Step successor)
{
boolean updated = successor.receiveKnowledgePhasedPredecessors(predecessor, StrictSerializabilityVerifier.this);
if (predecessor.witnessedUntil > successor.writtenAfter)
{
successor.writtenAfter = predecessor.witnessedUntil;
updated = true;
}
if (updated)
onChange(successor);
}
private void propagateToSuccessor(Step propagate, Step successor)
{
if (successor.receiveKnowledgePhasedPredecessors(propagate, StrictSerializabilityVerifier.this))
onChange(successor);
}
void onChange(Step step)
{
if (step.maxPredecessor(key).predecessorStep != null && step.maxPredecessor(key).predecessorStep.ofStepIndex >= step.ofStepIndex)
throw new HistoryViolation(key, "Cycle detected on key " + key + ", step " + step.ofStepIndex + " " + Arrays.toString(Arrays.copyOf(sequence, step.ofStepIndex)));
if (step.writtenBefore < step.writtenAfter)
throw new HistoryViolation(key, key + ": timestamp inconsistency, step " + step.ofStepIndex);
if (step.maxPredecessorWrittenAfter > step.writtenBefore)
throw new HistoryViolation(key, key + " must have been written prior to its maximum predecessor in real-time order on step " + step.ofStepIndex);
// refresh all successors (those where we're their max predecessor)
step.forEach(refresh::add);
if (step.successor != null)
propagateToDirectSuccessor(step, step.successor);
if (step.onChange != null)
{
// prevent reentrancy
Runnable run = step.onChange;
step.onChange = null;
run.run();
step.onChange = run;
}
}
void checkForUnwitnessed(int start)
{
futureWrites.checkForUnwitnessed(start);
}
@Override
public String toString()
{
return Arrays.toString(steps);
}
}
final int keyCount;
final Register[] registers;
// TODO (low priority): use another intrusive list or intrusive tree
final TreeSet<MaxPredecessor> refresh = new TreeSet<>();
// [key]->the sequence returned by any read performed in this transaction
final int[][] bufReads;
// [key]->the value of any write performed in this transaction
final int[] bufWrites;
// [key]->the step witnessed with the current transaction (if known)
final int[] bufNewPeerSteps;
final UnknownStepHolder[] bufUnknownSteps;
// TODO (desired, testing): verify operations with unknown outcomes are finalised by the first operation that starts after the coordinator abandons the txn
public StrictSerializabilityVerifier(int keyCount)
{
this.keyCount = keyCount;
this.bufNewPeerSteps = new int[keyCount];
this.bufWrites = new int[keyCount];
this.bufUnknownSteps = new UnknownStepHolder[keyCount];
this.bufReads = new int[keyCount][];
this.registers = IntStream.range(0, keyCount)
.mapToObj(Register::new)
.toArray(Register[]::new);
}
/**
* Start a new set of coincident observations
*/
public void begin()
{
Arrays.fill(bufWrites, -1);
Arrays.fill(bufNewPeerSteps, -1);
Arrays.fill(bufReads, null);
Arrays.fill(bufUnknownSteps, null);
}
/**
* Buffer a new read observation.
* <p>
* Note that this should EXCLUDE any witnessed write for this key.
* This is to simplify the creation of direct happens-before edges with observations for other keys
* that are implied by the witnessing of a write (and is also marginally more efficient).
*/
public void witnessRead(int key, int[] sequence)
{
if (bufReads[key] != null)
throw new IllegalStateException("Can buffer only one read observation for each key");
bufReads[key] = sequence;
// if we have a write, then for causality sequence is implicitly longer by one to include the write
bufNewPeerSteps[key] = bufWrites[key] >= 0 ? sequence.length + 1 : sequence.length;
}
/**
* Buffer a new read observation
*/
public void witnessWrite(int key, int id)
{
if (bufWrites[key] >= 0)
throw new IllegalStateException("Can buffer only one write observation for each key");
bufWrites[key] = id;
if (bufReads[key] != null)
bufNewPeerSteps[key] = bufReads[key].length + 1;
}
/**
* Apply the pending coincident observations that occurred between {@code start} and {@code end}
* to the verification graph
*/
public void apply(int start, int end)
{
boolean hasUnknownSteps = false;
for (int k = 0; k < bufReads.length ; ++k)
{
if (bufWrites[k] >= 0 && bufReads[k] == null)
{
int i = Arrays.binarySearch(registers[k].sequence, bufWrites[k]);
if (i >= 0) bufNewPeerSteps[k] = i + 1;
else bufUnknownSteps[k] = new UnknownStepHolder(bufWrites[k], start, end, new Step(k, Integer.MAX_VALUE, keyCount, bufWrites[k]));
hasUnknownSteps |= i < 0;
}
}
for (int k = 0; k < bufReads.length ; ++k)
{
if (bufReads[k] != null)
registers[k].updateSequence(bufReads[k], bufWrites[k]);
}
for (int k = 0; k < bufReads.length ; ++k)
{
if (bufWrites[k] >= 0 || bufReads[k] != null)
{
registers[k].updatePeersAndPredecessors(bufNewPeerSteps, bufReads, bufWrites, start, end, hasUnknownSteps ? bufUnknownSteps : null);
if (bufReads[k] != null)
registers[k].checkForUnwitnessed(start);
}
}
refreshTransitive();
}
private void refreshTransitive()
{
for (MaxPredecessor next = refresh.pollFirst(); next != null; next = refresh.pollFirst())
{
registers[next.ofKey].propagateToSuccessor(next.predecessorStep, next.ofStep);
}
}
public void print()
{
for (Register r : registers)
{
if (r == null) continue;
r.print();
}
}
public void print(int k)
{
registers[k].print();
}
}