blob: 185c1437aaf11b2d335a116fe75622da5203bd15 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package accord.impl;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import accord.api.Key;
import accord.local.Command;
import accord.local.SafeCommandStore.CommandFunction;
import accord.local.SafeCommandStore.TestDep;
import accord.local.SafeCommandStore.TestStartedAt;
import accord.local.SafeCommandStore.TestStatus;
import accord.local.SaveStatus;
import accord.local.Status;
import accord.primitives.Ballot;
import accord.primitives.Timestamp;
import accord.primitives.Txn.Kind.Kinds;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.SortedArrays;
import accord.utils.SortedList;
import static accord.impl.CommandsForKey.InternalStatus.ACCEPTED;
import static accord.impl.CommandsForKey.InternalStatus.COMMITTED;
import static accord.impl.CommandsForKey.InternalStatus.STABLE;
import static accord.impl.CommandsForKey.InternalStatus.HISTORICAL;
import static accord.impl.CommandsForKey.InternalStatus.INVALID_OR_TRUNCATED;
import static accord.impl.CommandsForKey.InternalStatus.TRANSITIVELY_KNOWN;
import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
import static accord.local.SafeCommandStore.TestDep.WITH;
import static accord.primitives.Txn.Kind.Write;
import static accord.utils.ArrayBuffers.cachedTxnIds;
import static accord.utils.Invariants.illegalState;
import static accord.utils.Invariants.isParanoid;
import static accord.utils.SortedArrays.linearUnion;
* A specialised collection for efficiently representing and querying everything we need for making coordination
* and recovery decisions about a key's command conflicts.
* Every command we know about that is not shard-redundant is listed in the TxnId[] collection, which is sorted by TxnId.
* This list implies the contents of the deps of all commands in the collection - it is assumed that in the normal course
* of events every transaction will include the full set of TxnId we know. We only encode divergences from this, stored
* in each command's {@code missing} collection.
* We then go one step further, exploiting the fact that the missing collection exists solely to implement recovery,
* and so we elide from this missing collection any TxnId we have recorded as Committed or higher.
* Any recovery coordinator that contacts this replica will report that the command has been agreed to execute,
* and so will not need to decipher any fast-path decisions. So the missing collection is redundant, as no command's deps
* will not be queried for this TxnId's presence/absence.
* TODO (expected) this logic applies equally well to Accepted
* The goal with these behaviours is that this missing collection will ordinarily be empty, and represented by the exact
* same NO_TXNIDS array instance as every other command.
* We also exploit the property that most commands will also agree to execute at their proposed TxnId. If we have
* no missing collection to encode, and no modified executeAt, we store a global NoInfo object that takes up no
* space on heap. These NoInfo objects permit further efficiencies, as we may perform class-pointer comparisons
* before querying any contents to skip uninteresting contents, permitting fast iteration of the collection's contents.
* We also impose the condition that every TxnId is uniquely represented in the collection, so any executeAt and missing
* collection that represents the same value as the TxnId must be the same object present in the main TxnId[].
* This collection also implements transitive dependency elision.
* When evaluating mapReduceActive, we first establish the last-executing Stable write command (i.e. those whose deps
* are considered durably decided, and so must wait for all commands Committed with a lower executeAt).
* We then elide any Committed command that has a lower executeAt than this command.
* Both commands must be known at a majority, but neither might be Committed at any other replica.
* Either command may therefore be recovered.
* If the later command is recovered, this replica will report its Stable deps thereby recovering them.
* If this replica is not contacted, some other replica must participate that either has taken the same action as this replica,
* or else does not know the later command is Stable, and so will report the earlier command as a dependency again.
* If the earlier command is recovered, this replica will report that it is Committed, and so will not consult
* this replica's collection to decipher any fast path decision. Any other replica must either do the same, or else
* will correctly record this transaction as present in any relevant deps of later transactions.
* TODO (expected): optimisations:
* 1) we probably can rely only on COMMITTED status rather than STABLE for filtering active, but require STABLE still for filtering on status
* 2) do we need PREAPPLIED state?
* 3) consider storing a prefix of TxnId that are all NoInfo PreApplied encoded as a BitStream as only required for computing missing collection
* 4) consider storing (or caching) an int[] of records with an executeAt that occurs out of order, sorted by executeAt
* TODO (required): randomised testing
public class CommandsForKey implements CommandsSummary
private static final boolean PRUNE_TRANSITIVE_DEPENDENCIES = true;
public static final TxnId[] NO_TXNIDS = new TxnId[0];
public static final Info[] NO_INFOS = new Info[0];
public static class SerializerSupport
public static CommandsForKey create(Key key, TxnId redundantBefore, TxnId[] txnIds, Info[] infos)
return new CommandsForKey(key, redundantBefore, txnIds, infos);
public enum InternalStatus
TRANSITIVELY_KNOWN(false, false), // (unwitnessed; no need for mapReduce to witness)
HISTORICAL(false, false),
// TODO (required): we can probably retire this status; STABLE is sufficient
static final EnumMap<SaveStatus, InternalStatus> convert = new EnumMap<>(SaveStatus.class);
static final InternalStatus[] VALUES = values();
convert.put(SaveStatus.PreAccepted, PREACCEPTED);
convert.put(SaveStatus.AcceptedInvalidateWithDefinition, PREACCEPTED);
convert.put(SaveStatus.Accepted, ACCEPTED);
convert.put(SaveStatus.AcceptedWithDefinition, ACCEPTED);
convert.put(SaveStatus.PreCommittedWithDefinition, PREACCEPTED);
convert.put(SaveStatus.PreCommittedWithAcceptedDeps, ACCEPTED);
convert.put(SaveStatus.PreCommittedWithDefinitionAndAcceptedDeps, ACCEPTED);
convert.put(SaveStatus.Committed, COMMITTED);
convert.put(SaveStatus.Stable, STABLE);
convert.put(SaveStatus.ReadyToExecute, STABLE);
convert.put(SaveStatus.PreApplied, PREAPPLIED);
convert.put(SaveStatus.Applying, PREAPPLIED);
convert.put(SaveStatus.Applied, PREAPPLIED);
convert.put(SaveStatus.TruncatedApplyWithDeps, INVALID_OR_TRUNCATED);
convert.put(SaveStatus.TruncatedApplyWithOutcome, INVALID_OR_TRUNCATED);
convert.put(SaveStatus.TruncatedApply, INVALID_OR_TRUNCATED);
convert.put(SaveStatus.ErasedOrInvalidated, INVALID_OR_TRUNCATED);
convert.put(SaveStatus.Erased, INVALID_OR_TRUNCATED);
convert.put(SaveStatus.Invalidated, INVALID_OR_TRUNCATED);
public final boolean hasInfo;
public final NoInfo asNoInfo;
public final InfoWithAdditions asNoInfoOrAdditions;
final InternalStatus expectMatch;
InternalStatus(boolean hasInfo)
this(hasInfo, true);
InternalStatus(boolean hasInfo, boolean expectMatch)
this.hasInfo = hasInfo;
this.asNoInfo = new NoInfo(this);
this.asNoInfoOrAdditions = new InfoWithAdditions(asNoInfo, NO_TXNIDS, 0);
this.expectMatch = expectMatch ? this : null;
boolean hasExecuteAt()
return hasInfo;
boolean hasDeps()
return hasInfo;
boolean hasStableDeps()
return this == STABLE || this == PREAPPLIED;
public Timestamp depsKnownBefore(TxnId txnId, @Nullable Timestamp executeAt)
switch (this)
default: throw new AssertionError("Unhandled InternalStatus: " + this);
throw new AssertionError("Invalid InternalStatus to know deps");
return txnId;
case STABLE:
return executeAt == null ? txnId : executeAt;
public static InternalStatus from(SaveStatus status)
return convert.get(status);
public static InternalStatus get(int ordinal)
return VALUES[ordinal];
public static class Info
public final InternalStatus status;
// ACCESS DIRECTLY WITH CARE: if null, TxnId is implied; use accessor method to ensure correct value is returned
public final @Nullable Timestamp executeAt;
public final TxnId[] missing; // those TxnId we know of that would be expected to be found in the provided deps, but aren't
private Info(InternalStatus status, @Nullable Timestamp executeAt, TxnId[] missing)
this.status = status;
this.executeAt = executeAt;
this.missing = missing;
public static Info create(@Nonnull TxnId txnId, InternalStatus status, @Nonnull Timestamp executeAt, @Nonnull TxnId[] missing)
return new Info(status,
Invariants.checkArgument(executeAt, executeAt == txnId || executeAt.compareTo(txnId) > 0),
Invariants.checkArgument(missing, missing == NO_TXNIDS || missing.length > 0));
public static Info createMock(InternalStatus status, @Nullable Timestamp executeAt, TxnId[] missing)
return new Info(status, executeAt, Invariants.checkArgument(missing, missing == null || missing == NO_TXNIDS));
public Timestamp executeAt(TxnId txnId)
return executeAt;
Timestamp depsKnownBefore(TxnId txnId)
return status.depsKnownBefore(txnId, executeAt);
Info update(TxnId txnId, TxnId[] newMissing)
return newMissing == NO_TXNIDS && executeAt == txnId ? status.asNoInfo : Info.create(txnId, status, executeAt(txnId), newMissing);
public boolean equals(Object o)
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Info info = (Info) o;
return status == info.status && Objects.equals(executeAt, info.executeAt) && Arrays.equals(missing, info.missing);
public int hashCode()
throw new UnsupportedOperationException();
public String toString()
return "Info{" +
"status=" + status +
", executeAt=" + executeAt +
", missing=" + Arrays.toString(missing) +
public static class NoInfo extends Info
NoInfo(InternalStatus status)
super(status, null, NO_TXNIDS);
public Timestamp executeAt(TxnId txnId)
return txnId;
private final Key key;
private final TxnId redundantBefore;
// any transactions that are durably decided (i.e. STABLE) and execute before this are durably dependent, and can be elided from mapReduceActive
private final @Nullable Timestamp maxStableWrite;
private final TxnId[] txnIds;
private final Info[] infos;
CommandsForKey(Key key, TxnId redundantBefore, TxnId[] txnIds, Info[] infos)
this(key, redundantBefore, maxStableWriteBefore(Timestamp.MAX, txnIds, infos), txnIds, infos);
CommandsForKey(Key key, TxnId redundantBefore, @Nullable Timestamp maxStableWrite, TxnId[] txnIds, Info[] infos)
this.key = key;
this.redundantBefore = redundantBefore;
this.maxStableWrite = maxStableWrite;
this.txnIds = txnIds;
this.infos = infos;
if (isParanoid()) Invariants.checkArgument(SortedArrays.isSortedUnique(txnIds));
public CommandsForKey(Key key)
this.key = key;
this.redundantBefore = TxnId.NONE;
this.maxStableWrite = null;
this.txnIds = NO_TXNIDS;
this.infos = NO_INFOS;
public String toString()
return "CommandsForKey@" + System.identityHashCode(this) + '{' + key + '}';
public Key key()
return key;
public int size()
return txnIds.length;
public int indexOf(TxnId txnId)
return Arrays.binarySearch(txnIds, txnId);
public TxnId txnId(int i)
return txnIds[i];
public Info info(int i)
return infos[i];
public TxnId redundantBefore()
return redundantBefore;
* All commands before/after (exclusive of) the given timestamp
* <p>
* Note that {@code testDep} applies only to commands that MAY have the command in their deps; if specified any
* commands that do not know any deps will be ignored, as will any with an executeAt prior to the txnId.
* <p>
public <P1, T> T mapReduceFull(TxnId testTxnId,
Kinds testKind,
TestStartedAt testStartedAt,
TestDep testDep,
TestStatus testStatus,
CommandFunction<P1, T, T> map, P1 p1, T initialValue)
int start, end;
boolean isKnown;
int insertPos = Arrays.binarySearch(txnIds, testTxnId);
isKnown = insertPos >= 0;
if (!isKnown && testDep == WITH) return initialValue;
if (!isKnown) insertPos = -1 - insertPos;
switch (testStartedAt)
default: throw new AssertionError("Unhandled TestStartedAt: " + testTxnId);
case STARTED_BEFORE: start = 0; end = insertPos; break;
case STARTED_AFTER: start = insertPos; end = txnIds.length; break;
case ANY: start = 0; end = txnIds.length;
for (int i = start; i < end ; ++i)
TxnId txnId = txnIds[i];
if (!testKind.test(txnId.kind())) continue;
Info info = infos[i];
InternalStatus status = info.status;
switch (testStatus)
default: throw new AssertionError("Unhandled TestStatus: " + testStatus);
if (status == ACCEPTED || status == COMMITTED) break;
else continue;
if (status.compareTo(STABLE) >= 0 && status.compareTo(INVALID_OR_TRUNCATED) < 0) break;
else continue;
Timestamp executeAt = info.executeAt(txnId);
if (testDep != ANY_DEPS)
if (!status.hasInfo)
if (executeAt.compareTo(testTxnId) <= 0)
boolean hasAsDep = Arrays.binarySearch(info.missing, testTxnId) < 0;
if (hasAsDep != (testDep == WITH))
initialValue = map.apply(p1, key, txnId, executeAt, initialValue);
return initialValue;
public <P1, T> T mapReduceActive(Timestamp startedBefore,
Kinds testKind,
CommandFunction<P1, T, T> map, P1 p1, T initialValue)
Timestamp maxStableWrite = maxStableWriteBefore(startedBefore);
int start = 0, end = insertPos(startedBefore);
for (int i = start; i < end ; ++i)
TxnId txnId = txnIds[i];
if (!testKind.test(txnId.kind()))
Info info = infos[i];
switch (info.status)
case STABLE:
// TODO (expected): prove the correctness of this approach
if (!PRUNE_TRANSITIVE_DEPENDENCIES || maxStableWrite == null || info.executeAt(txnId).compareTo(maxStableWrite) >= 0)
initialValue = map.apply(p1, key, txnId, info.executeAt(txnId), initialValue);
return initialValue;
public CommandsForKey update(Command prev, Command next)
InternalStatus newStatus = InternalStatus.from(next.saveStatus());
if (newStatus == null)
return this;
TxnId txnId = next.txnId();
int pos = Arrays.binarySearch(txnIds, txnId);
if (pos < 0)
pos = -1 - pos;
if (!newStatus.hasInfo)
return insert(pos, txnId, newStatus.asNoInfo);
return insert(pos, txnId, newStatus, next);
// we do not permit multiple versions of the same TxnId, so use the existing one
txnId = txnIds[pos];
// update
InternalStatus prevStatus = prev == null ? null : InternalStatus.from(prev.saveStatus());
if (newStatus == prevStatus && (!newStatus.hasInfo || next.acceptedOrCommitted().equals(prev.acceptedOrCommitted())))
return this;
@Nonnull Info cur = Invariants.nonNull(infos[pos]);
// TODO (required): HACK to permit prev.saveStatus() == SaveStatus.AcceptedInvalidateWithDefinition as we don't always update as keys aren't guaranteed to be provided
// fix as soon as we support async updates
Invariants.checkState(cur.status.expectMatch == prevStatus || (prev != null && prev.status() == Status.AcceptedInvalidate));
if (newStatus.hasInfo) return update(pos, txnId, cur, newStatus, next);
else return update(pos, cur, newStatus.asNoInfo);
public static boolean needsUpdate(Command prev, Command updated)
if (!updated.txnId().kind().isGloballyVisible())
return false;
SaveStatus prevStatus;
Ballot prevAcceptedOrCommitted;
if (prev == null)
prevStatus = SaveStatus.NotDefined;
prevAcceptedOrCommitted = Ballot.ZERO;
prevStatus = prev.saveStatus();
prevAcceptedOrCommitted = prev.acceptedOrCommitted();
return needsUpdate(prevStatus, prevAcceptedOrCommitted, updated.saveStatus(), updated.acceptedOrCommitted());
public static boolean needsUpdate(SaveStatus prevStatus, Ballot prevAcceptedOrCommitted, SaveStatus updatedStatus, Ballot updatedAcceptedOrCommitted)
InternalStatus prev = InternalStatus.from(prevStatus);
InternalStatus updated = InternalStatus.from(updatedStatus);
return updated != prev || (updated != null && updated.hasInfo && !prevAcceptedOrCommitted.equals(updatedAcceptedOrCommitted));
private Timestamp maxStableWriteBefore(Timestamp before)
if (maxStableWrite == null)
return null;
if (maxStableWrite.compareTo(before) < 0)
return maxStableWrite;
return maxStableWriteBefore(before, txnIds, infos);
private static Timestamp maxStableWriteBefore(Timestamp before, TxnId[] txnIds, Info[] infos)
int i = Arrays.binarySearch(txnIds, before);
if (i < 0) i = -1 -i;
Timestamp max = null;
while (--i >= 0)
TxnId txnId = txnIds[i];
if (txnId.kind() != Write) continue;
Info info = infos[i];
if (!info.status.hasStableDeps()) continue;
if (info.executeAt == null) max = txnId;
else if (info.executeAt.compareTo(before) < 0) max = info.executeAt;
else continue;
while (--i >= 0)
Info info = infos[i];
TxnId txnId = txnIds[i];
if (info.getClass() == NoInfo.class) continue;
if (!info.status.hasStableDeps()) continue;
if (txnId.kind() != Write) continue;
if (info.executeAt.compareTo(max) <= 0 || info.executeAt.compareTo(before) >= 0) continue;
max = info.executeAt;
return max;
private CommandsForKey insert(int insertPos, TxnId insertTxnId, InternalStatus newStatus, Command command)
InfoWithAdditions newInfo = computeInsert(insertPos, insertTxnId, newStatus, command);
if (newInfo.additionCount == 0)
return insert(insertPos, insertTxnId,;
TxnId[] newTxnIds = new TxnId[txnIds.length + newInfo.additionCount + 1];
Info[] newInfos = new Info[newTxnIds.length];
insertWithAdditions(insertPos, insertTxnId, newInfo, newTxnIds, newInfos);
return update(newTxnIds, newInfos, insertTxnId,;
private CommandsForKey update(int updatePos, TxnId txnId, Info prevInfo, InternalStatus newStatus, Command command)
InfoWithAdditions newInfo = computeUpdate(updatePos, txnId, newStatus, command);
if (newInfo.additionCount == 0)
return update(updatePos, prevInfo,;
TxnId[] newTxnIds = new TxnId[txnIds.length + newInfo.additionCount];
Info[] newInfos = new Info[newTxnIds.length];
TxnId updateTxnId = txnIds[updatePos]; // want to reuse the existing TxnId for object identity
int newPos = updateWithAdditions(updatePos, txnIds[updatePos], newInfo, newTxnIds, newInfos);
if (prevInfo.status.compareTo(COMMITTED) < 0 && newStatus.compareTo(COMMITTED) >= 0)
removeMissing(newTxnIds, newInfos, newPos);
return update(newTxnIds, newInfos, updateTxnId,;
private int updateWithAdditions(int updatePos, TxnId updateTxnId, InfoWithAdditions withInfo, TxnId[] newTxnIds, Info[] newInfos)
return updateOrInsertWithAdditions(updatePos, updatePos, updateTxnId, withInfo, newTxnIds, newInfos);
private void insertWithAdditions(int pos, TxnId updateTxnId, InfoWithAdditions withInfo, TxnId[] newTxnIds, Info[] newInfos)
updateOrInsertWithAdditions(pos, -1, updateTxnId, withInfo, newTxnIds, newInfos);
private int updateOrInsertWithAdditions(int sourceInsertPos, int sourceUpdatePos, TxnId updateTxnId, InfoWithAdditions withInfo, TxnId[] newTxnIds, Info[] newInfos)
TxnId[] additions = withInfo.additions;
int additionCount = withInfo.additionCount;
int additionInsertPos = Arrays.binarySearch(additions, 0, additionCount, updateTxnId);
additionInsertPos = Invariants.checkArgument(-1 - additionInsertPos, additionInsertPos < 0);
int targetInsertPos = sourceInsertPos + additionInsertPos;
// additions plus the updateTxnId when necessary
TxnId[] missingSource = additions;
boolean insertSelfMissing = sourceUpdatePos < 0 && < 0;
// the most recently constructed pure insert missing array, so that it may be reused if possible
TxnId[] cachedMissing = null;
int i = 0, j = 0, missingCount = 0, missingLimit = additionCount, count = 0;
while (i < infos.length)
if (count == targetInsertPos)
newTxnIds[count] = updateTxnId;
newInfos[count] =;
if (i == sourceUpdatePos) ++i;
else if (insertSelfMissing) ++missingCount;
int c = j == additionCount ? -1 : txnIds[i].compareTo(additions[j]);
if (c < 0)
TxnId txnId = txnIds[i];
Info info = infos[i];
if (i == sourceUpdatePos)
info =;
else if (info.status.hasDeps())
Timestamp depsKnownBefore = info.status.depsKnownBefore(txnId, info.executeAt);
if (insertSelfMissing && missingSource == additions && (missingCount != j || (depsKnownBefore != txnId && depsKnownBefore.compareTo(updateTxnId) > 0)))
missingSource = mergeMissing(additions, additionCount, updateTxnId, additionInsertPos);
int to = to(txnId, depsKnownBefore, missingSource, missingCount, missingLimit);
if (to > 0)
TxnId[] missing = info.missing == NO_TXNIDS && to == missingCount
? cachedMissing = ensureCachedMissing(missingSource, to, cachedMissing)
: linearUnion(info.missing, info.missing.length, missingSource, to, cachedTxnIds());
info = info.update(txnId, missing);
newTxnIds[count] = txnId;
newInfos[count] = info;
else if (c > 0)
newTxnIds[count] = additions[j++];
newInfos[count] = TRANSITIVELY_KNOWN.asNoInfo;
throw illegalState(txnIds[i] + " should be an insertion, but found match when merging with origin");
if (j < additionCount)
if (count <= targetInsertPos)
int length = targetInsertPos - count;
System.arraycopy(additions, j, newTxnIds, count, length);
Arrays.fill(newInfos, count, targetInsertPos, TRANSITIVELY_KNOWN.asNoInfo);
newTxnIds[targetInsertPos] = updateTxnId;
newInfos[targetInsertPos] =;
count = targetInsertPos + 1;
j = additionInsertPos;
System.arraycopy(additions, j, newTxnIds, count, additionCount - j);
Arrays.fill(newInfos, count, count + additionCount - j, TRANSITIVELY_KNOWN.asNoInfo);
else if (count == targetInsertPos)
newTxnIds[targetInsertPos] = updateTxnId;
newInfos[targetInsertPos] =;
cachedTxnIds().forceDiscard(additions, additionCount);
return targetInsertPos;
private CommandsForKey update(int pos, Info curInfo, Info newInfo)
if (curInfo == newInfo)
return this;
Info[] newInfos = infos.clone();
newInfos[pos] = newInfo;
if (curInfo.status.compareTo(COMMITTED) < 0 && newInfo.status.compareTo(COMMITTED) >= 0)
removeMissing(txnIds, newInfos, pos);
return update(txnIds, newInfos, txnIds[pos], newInfo);
* Insert a new txnId and info
private CommandsForKey insert(int pos, TxnId newTxnId, Info newInfo)
TxnId[] newTxnIds = new TxnId[txnIds.length + 1];
System.arraycopy(txnIds, 0, newTxnIds, 0, pos);
newTxnIds[pos] = newTxnId;
System.arraycopy(txnIds, pos, newTxnIds, pos + 1, txnIds.length - pos);
Info[] newInfos = new Info[infos.length + 1];
if (newInfo.status.compareTo(COMMITTED) >= 0)
System.arraycopy(infos, 0, newInfos, 0, pos);
newInfos[pos] = newInfo;
System.arraycopy(infos, pos, newInfos, pos + 1, infos.length - pos);
insertInfoAndOneMissing(pos, newTxnId, newInfo, infos, newInfos, newTxnIds, 1);
return update(newTxnIds, newInfos, newTxnId, newInfo);
* Insert a new txnId and info, then insert the txnId into the missing collection of any command that should have already caused us to witness it
private static void insertInfoAndOneMissing(int insertPos, TxnId txnId, Info newInfo, Info[] oldInfos, Info[] newInfos, TxnId[] newTxnIds, int offsetAfterInsertPos)
TxnId[] oneMissing = null;
for (int i = 0 ; i < insertPos ; ++i)
Info oldInfo = oldInfos[i];
if (oldInfo.getClass() != NoInfo.class)
Timestamp depsKnownBefore = oldInfo.depsKnownBefore(null);
if (depsKnownBefore != null && depsKnownBefore.compareTo(txnId) > 0)
TxnId[] missing;
if (oldInfo.missing == NO_TXNIDS)
missing = oneMissing = ensureOneMissing(txnId, oneMissing);
missing = SortedArrays.insert(oldInfo.missing, txnId, TxnId[]::new);
newInfos[i] = Info.create(txnId, oldInfo.status, oldInfo.executeAt, missing);
newInfos[i] = oldInfo;
newInfos[insertPos] = newInfo;
for (int i = insertPos; i < oldInfos.length ; ++i)
Info oldInfo = oldInfos[i];
if (!oldInfo.status.hasDeps())
newInfos[i + offsetAfterInsertPos] = oldInfo;
TxnId[] missing;
if (oldInfo.missing == NO_TXNIDS)
missing = oneMissing = ensureOneMissing(txnId, oneMissing);
missing = SortedArrays.insert(oldInfo.missing, txnId, TxnId[]::new);
int newIndex = i + offsetAfterInsertPos;
newInfos[newIndex] = Info.create(newTxnIds[newIndex], oldInfo.status, oldInfo.executeAt(newTxnIds[newIndex]), missing);
private static void removeMissing(TxnId[] txnIds, Info[] infos, int pos)
TxnId removeTxnId = txnIds[pos];
for (int i = 0 ; i < infos.length ; ++i)
Info info = infos[i];
if (info.getClass() == NoInfo.class) continue;
TxnId[] missing = info.missing;
if (missing == NO_TXNIDS) continue;
// linear scan on object identity going to be faster in practice than binary search almost every time
int j = 0;
while (j < missing.length && missing[j] != removeTxnId) ++j;
if (j == missing.length) continue;
if (missing.length == 1)
missing = NO_TXNIDS;
int length = missing.length;
TxnId[] newMissing = new TxnId[length - 1];
System.arraycopy(missing, 0, newMissing, 0, j);
System.arraycopy(missing, j + 1, newMissing, j, length - (1 + j));
missing = newMissing;
infos[i] = info.update(txnIds[i], missing);
private static TxnId[] ensureOneMissing(TxnId txnId, TxnId[] oneMissing)
return oneMissing != null ? oneMissing : new TxnId[] { txnId };
private static TxnId[] ensureCachedMissing(TxnId[] missingIds, int missingIdCount, TxnId[] cachedMissing)
return cachedMissing != null && cachedMissing.length == missingIdCount ? cachedMissing : Arrays.copyOf(missingIds, missingIdCount);
private static TxnId[] mergeMissing(TxnId[] additions, int additionCount, TxnId updateTxnId, int additionInsertPos)
TxnId[] missingSource = new TxnId[additionCount + 1];
System.arraycopy(additions, 0, missingSource, 0, additionInsertPos);
System.arraycopy(additions, additionInsertPos, missingSource, additionInsertPos + 1, additionCount - additionInsertPos);
missingSource[additionInsertPos] = updateTxnId;
return missingSource;
private static int to(TxnId txnId, Timestamp depsKnownBefore, TxnId[] missingSource, int missingCount, int missingLimit)
if (depsKnownBefore == txnId) return missingCount;
int to = Arrays.binarySearch(missingSource, 0, missingLimit, depsKnownBefore);
if (to < 0) to = -1 - to;
return to;
static class InfoWithAdditions
final Info info;
final TxnId[] additions;
final int additionCount;
InfoWithAdditions(Info info, TxnId[] additions, int additionCount)
{ = info;
this.additions = additions;
this.additionCount = additionCount;
private InfoWithAdditions computeInsert(int insertPos, TxnId txnId, InternalStatus newStatus, Command command)
return computeInfoAndAdditions(insertPos, -1, txnId, newStatus, command);
private InfoWithAdditions computeUpdate(int updatePos, TxnId txnId, InternalStatus newStatus, Command command)
return computeInfoAndAdditions(updatePos, updatePos, txnId, newStatus, command);
private InfoWithAdditions computeInfoAndAdditions(int insertPos, int updatePos, TxnId txnId, InternalStatus newStatus, Command command)
Timestamp executeAt = txnId;
if (newStatus.hasInfo)
executeAt = command.executeAt();
if (executeAt.equals(txnId)) executeAt = txnId;
Timestamp depsKnownBefore = newStatus.depsKnownBefore(txnId, executeAt);
return computeInfoAndAdditions(insertPos, updatePos, txnId, newStatus, executeAt, depsKnownBefore, command.partialDeps().keyDeps.txnIds(key));
private InfoWithAdditions computeInfoAndAdditions(int insertPos, int updatePos, TxnId txnId, InternalStatus newStatus, Timestamp executeAt, Timestamp depsKnownBefore, SortedList<TxnId> deps)
int depsKnownBeforePos;
if (depsKnownBefore == txnId)
depsKnownBeforePos = insertPos;
depsKnownBeforePos = Arrays.binarySearch(txnIds, insertPos, txnIds.length, depsKnownBefore);
Invariants.checkState(depsKnownBeforePos < 0);
depsKnownBeforePos = -1 - depsKnownBeforePos;
TxnId[] additions = NO_TXNIDS, missing = NO_TXNIDS;
int additionCount = 0, missingCount = 0;
int depsIndex = deps.find(redundantBefore);
if (depsIndex < 0) depsIndex = -1 - depsIndex;
int txnIdsIndex = 0;
while (txnIdsIndex < depsKnownBeforePos && depsIndex < deps.size())
TxnId t = txnIds[txnIdsIndex];
TxnId d = deps.get(depsIndex);
int c = t.compareTo(d);
if (c == 0)
else if (c < 0)
// we expect to be missing ourselves
// we also permit any transaction we have recorded as COMMITTED or later to be missing, as recovery will not need to consult our information
if (txnIdsIndex != updatePos && infos[txnIdsIndex].status.compareTo(COMMITTED) < 0)
if (missingCount == missing.length)
missing = cachedTxnIds().resize(missing, missingCount, Math.max(8, missingCount * 2));
missing[missingCount++] = t;
if (additionCount >= additions.length)
additions = cachedTxnIds().resize(additions, additionCount, Math.max(8, additionCount * 2));
additions[additionCount++] = d;
while (txnIdsIndex < depsKnownBeforePos)
if (txnIdsIndex != updatePos && infos[txnIdsIndex].status.compareTo(COMMITTED) < 0)
TxnId t = txnIds[txnIdsIndex];
if (missingCount == missing.length)
missing = cachedTxnIds().resize(missing, missingCount, Math.max(8, missingCount * 2));
missing[missingCount++] = t;
while (depsIndex < deps.size())
if (additionCount >= additions.length)
additions = cachedTxnIds().resize(additions, additionCount, Math.max(8, additionCount * 2));
additions[additionCount++] = deps.get(depsIndex++);
if (missingCount == 0 && executeAt == txnId)
return additionCount == 0 ? newStatus.asNoInfoOrAdditions : new InfoWithAdditions(newStatus.asNoInfo, additions, additionCount);
return new InfoWithAdditions(Info.create(txnId, newStatus, executeAt, cachedTxnIds().completeAndDiscard(missing, missingCount)), additions, additionCount);
private CommandsForKey update(TxnId[] newTxnIds, Info[] newInfos, TxnId updatedTxnId, Info updatedInfo)
Timestamp maxStableWrite = maybeUpdateMaxStableWrite(updatedTxnId, updatedInfo, this.maxStableWrite);
return new CommandsForKey(key, redundantBefore, maxStableWrite, newTxnIds, newInfos);
private static Timestamp maybeUpdateMaxStableWrite(TxnId txnId, Info info, Timestamp maxStableWrite)
if (!info.status.hasStableDeps() || txnId.kind() != Write)
return maxStableWrite;
Timestamp executeAt = info.executeAt(txnId);
if (maxStableWrite != null && maxStableWrite.compareTo(executeAt) >= 0)
return maxStableWrite;
return executeAt;
public CommandsForKey withoutRedundant(TxnId redundantBefore)
if (this.redundantBefore.compareTo(redundantBefore) >= 0)
return this;
int pos = insertPos(redundantBefore);
if (pos == 0)
return new CommandsForKey(key, redundantBefore, maxStableWrite, txnIds, infos);
TxnId[] newTxnIds = Arrays.copyOfRange(txnIds, pos, txnIds.length);
Info[] newInfos = Arrays.copyOfRange(infos, pos, infos.length);
for (int i = 0 ; i < newInfos.length ; ++i)
Info info = newInfos[i];
if (info.getClass() == NoInfo.class) continue;
if (info.missing == NO_TXNIDS) continue;
int j = Arrays.binarySearch(info.missing, redundantBefore);
if (j < 0) j = -1 - j;
if (j <= 0) continue;
TxnId[] newMissing = j == info.missing.length ? NO_TXNIDS : Arrays.copyOfRange(info.missing, j, info.missing.length);
newInfos[i] = info.update(txnIds[i], newMissing);
return new CommandsForKey(key, redundantBefore, maxStableWrite, newTxnIds, newInfos);
public CommandsForKey registerHistorical(TxnId txnId)
int i = Arrays.binarySearch(txnIds, txnId);
if (i >= 0)
return infos[i].status.compareTo(HISTORICAL) >= 0 ? this : update(i, infos[i], HISTORICAL.asNoInfo);
return insert(-1 - i, txnId, HISTORICAL.asNoInfo);
private int insertPos(Timestamp timestamp)
int i = Arrays.binarySearch(txnIds, timestamp);
if (i < 0) i = -1 -i;
return i;
public TxnId findFirst()
return txnIds.length > 0 ? txnIds[0] : null;
public boolean equals(Object o)
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CommandsForKey that = (CommandsForKey) o;
return Objects.equals(key, that.key)
&& Objects.equals(redundantBefore, that.redundantBefore)
&& Arrays.equals(txnIds, that.txnIds)
&& Arrays.equals(infos, that.infos);
public int hashCode()
throw new UnsupportedOperationException();