blob: 9e628b924b5d71a8117f8d3ed23f3f66af2f789c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.local;
import java.util.Arrays;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import accord.api.Key;
import accord.impl.CommandsSummary;
import accord.local.SafeCommandStore.CommandFunction;
import accord.local.SafeCommandStore.TestDep;
import accord.local.SafeCommandStore.TestStartedAt;
import accord.local.SafeCommandStore.TestStatus;
import accord.primitives.Ballot;
import accord.primitives.Keys;
import accord.primitives.Participants;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn.Kind;
import accord.primitives.Txn.Kind.Kinds;
import accord.primitives.TxnId;
import accord.utils.ArrayBuffers;
import accord.utils.Invariants;
import accord.utils.RelationMultiMap.SortedRelationList;
import accord.utils.SortedArrays;
import accord.utils.SortedList;
import static accord.local.CommandsForKey.InternalStatus.ACCEPTED;
import static accord.local.CommandsForKey.InternalStatus.APPLIED;
import static accord.local.CommandsForKey.InternalStatus.COMMITTED;
import static accord.local.CommandsForKey.InternalStatus.PREACCEPTED;
import static accord.local.CommandsForKey.InternalStatus.STABLE;
import static accord.local.CommandsForKey.InternalStatus.HISTORICAL;
import static accord.local.CommandsForKey.InternalStatus.INVALID_OR_TRUNCATED;
import static accord.local.CommandsForKey.InternalStatus.TRANSITIVELY_KNOWN;
import static accord.local.CommandsForKey.InternalStatus.from;
import static accord.local.CommandsForKey.Unmanaged.Pending.APPLY;
import static accord.local.CommandsForKey.Unmanaged.Pending.COMMIT;
import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
import static accord.local.SafeCommandStore.TestDep.WITH;
import static accord.local.SaveStatus.LocalExecution.WaitingToApply;
import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
import static accord.primitives.Txn.Kind.Kinds.AnyGloballyVisible;
import static accord.utils.ArrayBuffers.cachedTxnIds;
import static accord.utils.Invariants.illegalState;
import static accord.utils.Invariants.isParanoid;
import static accord.utils.SortedArrays.Search.CEIL;
import static accord.utils.SortedArrays.Search.FAST;
/**
* A specialised collection for efficiently representing and querying everything we need for making coordination
* and recovery decisions about a key's command conflicts.
*
* Every command we know about that is not shard-redundant is listed in the TxnId[] collection, which is sorted by TxnId.
* This list implies the contents of the deps of all commands in the collection - it is assumed that in the normal course
* of events every transaction will include the full set of TxnId we know. We only encode divergences from this, stored
* in each command's {@code missing} collection.
*
* We then go one step further, exploiting the fact that the missing collection exists solely to implement recovery,
* and so we elide from this missing collection any TxnId we have recorded as Committed or higher.
* Any recovery coordinator that contacts this replica will report that the command has been agreed to execute,
* and so will not need to decipher any fast-path decisions. So the missing collection is redundant, as no command's deps
* will not be queried for this TxnId's presence/absence.
* TODO (expected) this logic applies equally well to Accepted
*
* The goal with these behaviours is that this missing collection will ordinarily be empty, and represented by the exact
* same NO_TXNIDS array instance as every other command.
*
* We also exploit the property that most commands will also agree to execute at their proposed TxnId. If we have
* no missing collection to encode, and no modified executeAt, we store a global NoInfo object that takes up no
* space on heap. These NoInfo objects permit further efficiencies, as we may perform class-pointer comparisons
* before querying any contents to skip uninteresting contents, permitting fast iteration of the collection's contents.
*
* We also impose the condition that every TxnId is uniquely represented in the collection, so any executeAt and missing
* collection that represents the same value as the TxnId must be the same object present in the main TxnId[].
*
* This collection also implements transitive dependency elision.
* When evaluating mapReduceActive, we first establish the last-executing Stable write command (i.e. those whose deps
* are considered durably decided, and so must wait for all commands Committed with a lower executeAt).
* We then elide any Committed command that has a lower executeAt than this command.
*
* Both commands must be known at a majority, but neither might be Committed at any other replica.
* Either command may therefore be recovered.
* If the later command is recovered, this replica will report its Stable deps thereby recovering them.
* If this replica is not contacted, some other replica must participate that either has taken the same action as this replica,
* or else does not know the later command is Stable, and so will report the earlier command as a dependency again.
* If the earlier command is recovered, this replica will report that it is Committed, and so will not consult
* this replica's collection to decipher any fast path decision. Any other replica must either do the same, or else
* will correctly record this transaction as present in any relevant deps of later transactions.
*
* TODO (expected): optimisations:
* 3) consider storing a prefix of TxnId that are all NoInfo PreApplied encoded as a BitStream as only required for computing missing collection
* 4) consider storing (or caching) an int[] of records with an executeAt that occurs out of order, sorted by executeAt
*
* TODO (expected): maintain separate redundantBefore and closedBefore timestamps, latter implied by any exclusivesyncpoint;
* advance former based on Applied status of all TxnId before the latter
* TODO (expected): track whether a TxnId is a write on this key only for execution (rather than globally)
* TODO (expected): merge with TimestampsForKey
* TODO (expected): save bytes by encoding InternalStatus in TxnId.flags()
* TODO (expected): migrate to BTree
* TODO (expected): remove a command that is committed to not intersect with the key for this store (i.e. if accepted in a later epoch than committed on, so ownership changes)
* TODO (expected): mark a command as notified once ready-to-execute or applying
* TODO (required): randomised testing
* TODO (required): linearizability violation detection
* TODO (required): account for whether transactions should witness each other for determining the missing collection
* TODO (required): enforce that a transaction is not added with a key dependency on a SyncPoint or ExclusiveSyncPoint;
* also impose this invariant elsewhere, and create a compile-time dependency between these code locations
*/
public class CommandsForKey implements CommandsSummary
{
private static final boolean PRUNE_TRANSITIVE_DEPENDENCIES = true;
public static final RedundantBefore.Entry NO_REDUNDANT_BEFORE = new RedundantBefore.Entry(null, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.NONE, null);
public static final TxnId[] NO_TXNIDS = new TxnId[0];
public static final TxnInfo[] NO_INFOS = new TxnInfo[0];
public static final Unmanaged[] NO_PENDING_UNMANAGED = new Unmanaged[0];
public static class Unmanaged implements Comparable<Unmanaged>
{
public enum Pending { COMMIT, APPLY }
public final Pending pending;
public final Timestamp waitingUntil;
public final TxnId txnId;
public Unmanaged(Pending pending, TxnId txnId, Timestamp waitingUntil)
{
this.pending = pending;
this.txnId = txnId;
this.waitingUntil = waitingUntil;
}
@Override
public int compareTo(Unmanaged that)
{
if (this.pending != that.pending) return this.pending == COMMIT ? -1 : 1;
int c = this.waitingUntil.compareTo(that.waitingUntil);
if (c == 0) c = this.txnId.compareTo(that.txnId);
return c;
}
@Override
public String toString()
{
return "Pending{" + txnId + " until:" + waitingUntil + " " + pending + "}";
}
}
public static class SerializerSupport
{
public static CommandsForKey create(Key key, TxnInfo[] txns, Unmanaged[] unmanageds)
{
return new CommandsForKey(key, NO_REDUNDANT_BEFORE, txns, unmanageds);
}
}
public enum InternalStatus
{
TRANSITIVELY_KNOWN(false, false), // (unwitnessed; no need for mapReduce to witness)
HISTORICAL(false, false),
PREACCEPTED(false),
ACCEPTED(true),
COMMITTED(true),
STABLE(true),
APPLIED(true),
INVALID_OR_TRUNCATED(false);
static final EnumMap<SaveStatus, InternalStatus> convert = new EnumMap<>(SaveStatus.class);
static final InternalStatus[] VALUES = values();
static
{
convert.put(SaveStatus.PreAccepted, PREACCEPTED);
convert.put(SaveStatus.AcceptedInvalidateWithDefinition, PREACCEPTED);
convert.put(SaveStatus.Accepted, ACCEPTED);
convert.put(SaveStatus.AcceptedWithDefinition, ACCEPTED);
convert.put(SaveStatus.PreCommittedWithDefinition, PREACCEPTED);
convert.put(SaveStatus.PreCommittedWithAcceptedDeps, ACCEPTED);
convert.put(SaveStatus.PreCommittedWithDefinitionAndAcceptedDeps, ACCEPTED);
convert.put(SaveStatus.Committed, COMMITTED);
convert.put(SaveStatus.Stable, STABLE);
convert.put(SaveStatus.ReadyToExecute, STABLE);
convert.put(SaveStatus.PreApplied, STABLE);
convert.put(SaveStatus.Applying, STABLE);
convert.put(SaveStatus.Applied, APPLIED);
convert.put(SaveStatus.TruncatedApplyWithDeps, INVALID_OR_TRUNCATED);
convert.put(SaveStatus.TruncatedApplyWithOutcome, INVALID_OR_TRUNCATED);
convert.put(SaveStatus.TruncatedApply, INVALID_OR_TRUNCATED);
convert.put(SaveStatus.ErasedOrInvalidated, INVALID_OR_TRUNCATED);
convert.put(SaveStatus.Erased, INVALID_OR_TRUNCATED);
convert.put(SaveStatus.Invalidated, INVALID_OR_TRUNCATED);
}
public final boolean hasInfo;
// TODO (desired): cleanup expectation logic re: accepted invalidate
final InternalStatus expectMatch;
InternalStatus(boolean hasInfo)
{
this(hasInfo, true);
}
InternalStatus(boolean hasInfo, boolean expectMatch)
{
this.hasInfo = hasInfo;
this.expectMatch = expectMatch ? this : null;
}
boolean hasExecuteAt()
{
return hasInfo;
}
boolean hasDeps()
{
return hasInfo;
}
boolean hasStableDeps()
{
return this == STABLE || this == APPLIED;
}
public Timestamp depsKnownBefore(TxnId txnId, @Nullable Timestamp executeAt)
{
switch (this)
{
default: throw new AssertionError("Unhandled InternalStatus: " + this);
case TRANSITIVELY_KNOWN:
case INVALID_OR_TRUNCATED:
case HISTORICAL:
throw new AssertionError("Invalid InternalStatus to know deps");
case PREACCEPTED:
case ACCEPTED:
return txnId;
case APPLIED:
case STABLE:
case COMMITTED:
return executeAt == null ? txnId : executeAt;
}
}
@VisibleForTesting
public static InternalStatus from(SaveStatus status)
{
return convert.get(status);
}
public static InternalStatus get(int ordinal)
{
return VALUES[ordinal];
}
}
public static class TxnInfo extends TxnId
{
public final InternalStatus status;
public final Timestamp executeAt;
TxnInfo(TxnId txnId, InternalStatus status, Timestamp executeAt)
{
super(txnId);
this.status = status;
this.executeAt = executeAt == txnId ? this : executeAt;
}
public static TxnInfo create(@Nonnull TxnId txnId, InternalStatus status)
{
return new TxnInfo(txnId, status, txnId);
}
public static TxnInfo create(@Nonnull TxnId txnId, InternalStatus status, @Nonnull Timestamp executeAt)
{
Invariants.checkState(executeAt == txnId || !executeAt.equals(txnId));
return new TxnInfo(txnId, status, executeAt);
}
public static TxnInfo create(@Nonnull TxnId txnId, InternalStatus status, @Nonnull Timestamp executeAt, @Nonnull TxnId[] missing)
{
Invariants.checkState(executeAt == txnId || !executeAt.equals(txnId));
if (missing == NO_TXNIDS) return new TxnInfo(txnId, status, executeAt);
return new TxnInfoWithMissing(txnId, status, executeAt, missing);
}
public static TxnInfo createMock(TxnId txnId, InternalStatus status, @Nullable Timestamp executeAt, @Nullable TxnId[] missing)
{
Invariants.checkState(executeAt == null || executeAt == txnId || !executeAt.equals(txnId));
Invariants.checkArgument(missing == null || missing == NO_TXNIDS);
if (missing == NO_TXNIDS) return new TxnInfo(txnId, status, executeAt);
return new TxnInfoWithMissing(txnId, status, executeAt, missing);
}
Timestamp depsKnownBefore()
{
return status.depsKnownBefore(this, executeAt);
}
public TxnInfo update(TxnId[] newMissing)
{
return newMissing == NO_TXNIDS ? new TxnInfo(this, status, executeAt)
: new TxnInfoWithMissing(this, status, executeAt, newMissing);
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TxnInfo info = (TxnInfo) o;
return status == info.status
&& (executeAt == this ? info.executeAt == info : Objects.equals(executeAt, info.executeAt))
&& Arrays.equals(missing(), info.missing());
}
TxnId asPlainTxnId()
{
return new TxnId(this);
}
public TxnId[] missing()
{
return NO_TXNIDS;
}
@Override
public int hashCode()
{
throw new UnsupportedOperationException();
}
@Override
public String toString()
{
return "Info{" +
"txnId=" + toPlainString() +
", status=" + status +
", executeAt=" + (executeAt == this ? toPlainString() : executeAt.toString()) +
'}';
}
public String toPlainString()
{
return super.toString();
}
}
public static class TxnInfoWithMissing extends TxnInfo
{
public final TxnId[] missing;
TxnInfoWithMissing(TxnId txnId, InternalStatus status, Timestamp executeAt, TxnId[] missing)
{
super(txnId, status, executeAt);
this.missing = missing;
}
public TxnId[] missing()
{
return missing;
}
@Override
public String toString()
{
return "Info{" +
"txnId=" + toPlainString() +
", status=" + status +
", executeAt=" + (this == executeAt ? toPlainString() : executeAt) +
", missing=" + Arrays.toString(missing) +
'}';
}
}
private final Key key;
private final RedundantBefore.Entry redundantBefore;
// all transactions, sorted by TxnId
private final TxnInfo[] txns;
// next and nextWrite are only non-null if they and their dependencies are all committed
private final TxnInfo minUncommitted, next, nextWrite;
// transactions that are committed or stable plus MAYBE Applied, keyed by executeAt
private final TxnInfo[] committed;
private final Unmanaged[] unmanageds;
CommandsForKey(Key key, RedundantBefore.Entry redundantBefore, TxnInfo[] txns, Unmanaged[] unmanageds)
{
this.key = key;
this.redundantBefore = Invariants.nonNull(redundantBefore);
this.txns = txns;
this.unmanageds = unmanageds;
if (isParanoid()) Invariants.checkArgument(SortedArrays.isSortedUnique(txns));
// TODO (required): update these efficiently
int countCommitted = 0;
TxnInfo minUncommitted = null, nextWrite = null, next = null;
for (int i = 0 ; i < txns.length ; ++i)
{
TxnInfo txn = txns[i];
if (txn.status == INVALID_OR_TRUNCATED) continue;
if (txn.status.compareTo(COMMITTED) >= 0)
{
if (txn.status.compareTo(APPLIED) < 0)
{
if (txn.kind().isWrite() && (nextWrite == null || nextWrite.executeAt.compareTo(txn.executeAt) > 0))
nextWrite = txn;
if (next == null || next.executeAt.compareTo(txn.executeAt) > 0)
next = txn;
}
++countCommitted;
}
else if (minUncommitted == null) minUncommitted = txn;
}
if (minUncommitted != null)
{
// TODO (expected): discount minUncommitted if not a dependency of next/nextWrite
if (next != null && minUncommitted.compareTo(next.executeAt) < 0)
next = null;
if (nextWrite != null && minUncommitted.compareTo(nextWrite.executeAt) < 0)
nextWrite = null;
}
this.minUncommitted = minUncommitted;
this.nextWrite = nextWrite;
this.next = next;
this.committed = new TxnInfo[countCommitted];
countCommitted = 0;
for (int i = 0 ; i < txns.length ; ++i)
{
if (txns[i].status.compareTo(COMMITTED) >= 0 && txns[i].status != INVALID_OR_TRUNCATED)
committed[countCommitted++] = txns[i];
}
Arrays.sort(committed, Comparator.comparing(c -> c.executeAt));
}
public CommandsForKey(Key key)
{
this.key = key;
this.redundantBefore = NO_REDUNDANT_BEFORE;
this.txns = NO_INFOS;
this.committed = NO_INFOS;
this.minUncommitted = null;
this.nextWrite = null;
this.next = null;
this.unmanageds = NO_PENDING_UNMANAGED;
}
@Override
public String toString()
{
return "CommandsForKey@" + System.identityHashCode(this) + '{' + key + '}';
}
public Key key()
{
return key;
}
public int size()
{
return txns.length;
}
public int unmanagedCount()
{
return unmanageds.length;
}
public int indexOf(TxnId txnId)
{
return Arrays.binarySearch(txns, txnId);
}
public TxnId txnId(int i)
{
return txns[i];
}
public TxnInfo get(int i)
{
return txns[i];
}
public Unmanaged getUnmanaged(int i)
{
return unmanageds[i];
}
public TxnInfo get(TxnId txnId)
{
int i = indexOf(txnId);
return i < 0 ? null : txns[i];
}
public RedundantBefore.Entry redundantBefore()
{
return redundantBefore;
}
public TxnId locallyRedundantBefore()
{
return redundantBefore.locallyRedundantBefore();
}
public TxnId shardRedundantBefore()
{
return redundantBefore.shardRedundantBefore();
}
/**
* All commands before/after (exclusive of) the given timestamp
* <p>
* Note that {@code testDep} applies only to commands that MAY have the command in their deps; if specified any
* commands that do not know any deps will be ignored, as will any with an executeAt prior to the txnId.
* <p>
*/
public <P1, T> T mapReduceFull(TxnId testTxnId,
Kinds testKind,
TestStartedAt testStartedAt,
TestDep testDep,
TestStatus testStatus,
CommandFunction<P1, T, T> map, P1 p1, T initialValue)
{
int start, end;
boolean isKnown;
{
int insertPos = Arrays.binarySearch(txns, testTxnId);
isKnown = insertPos >= 0;
if (!isKnown && testDep == WITH) return initialValue;
if (!isKnown) insertPos = -1 - insertPos;
switch (testStartedAt)
{
default: throw new AssertionError("Unhandled TestStartedAt: " + testTxnId);
case STARTED_BEFORE: start = 0; end = insertPos; break;
case STARTED_AFTER: start = insertPos; end = txns.length; break;
case ANY: start = 0; end = txns.length;
}
}
for (int i = start; i < end ; ++i)
{
TxnInfo txn = txns[i];
if (!testKind.test(txn.kind())) continue;
InternalStatus status = txn.status;
switch (testStatus)
{
default: throw new AssertionError("Unhandled TestStatus: " + testStatus);
case IS_PROPOSED:
if (status == ACCEPTED || status == COMMITTED) break;
else continue;
case IS_STABLE:
if (status.compareTo(STABLE) >= 0 && status.compareTo(INVALID_OR_TRUNCATED) < 0) break;
else continue;
case ANY_STATUS:
if (status == TRANSITIVELY_KNOWN)
continue;
}
Timestamp executeAt = txn.executeAt;
if (testDep != ANY_DEPS)
{
if (!status.hasInfo)
continue;
if (executeAt.compareTo(testTxnId) <= 0)
continue;
boolean hasAsDep = Arrays.binarySearch(txn.missing(), testTxnId) < 0;
if (hasAsDep != (testDep == WITH))
continue;
}
initialValue = map.apply(p1, key, txn.asPlainTxnId(), executeAt, initialValue);
}
return initialValue;
}
public <P1, T> T mapReduceActive(Timestamp startedBefore,
Kinds testKind,
CommandFunction<P1, T, T> map, P1 p1, T initialValue)
{
Timestamp maxCommittedBefore;
{
int i = SortedArrays.binarySearch(committed, 0, committed.length, startedBefore, (f, v) -> f.compareTo(v.executeAt), FAST);
if (i < 0) i = -2 - i;
else --i;
while (i >= 0 && !committed[i].kind().isWrite()) --i;
maxCommittedBefore = i < 0 ? null : committed[i].executeAt;
}
int start = 0, end = insertPos(start, startedBefore);
for (int i = start; i < end ; ++i)
{
TxnInfo txn = txns[i];
if (!testKind.test(txn.kind()))
continue;
switch (txn.status)
{
case COMMITTED:
case STABLE:
case APPLIED:
// TODO (expected): prove the correctness of this approach
if (!PRUNE_TRANSITIVE_DEPENDENCIES || maxCommittedBefore == null || txn.executeAt.compareTo(maxCommittedBefore) >= 0)
break;
case TRANSITIVELY_KNOWN:
case INVALID_OR_TRUNCATED:
continue;
}
initialValue = map.apply(p1, key, txn.asPlainTxnId(), txn.executeAt, initialValue);
}
return initialValue;
}
public CommandsForKey update(Command prev, Command next)
{
return update(prev, next, true);
}
CommandsForKey update(Command prev, Command next, boolean hasPrev)
{
InternalStatus newStatus = InternalStatus.from(next.saveStatus());
if (newStatus == null)
return this;
TxnId txnId = next.txnId();
int pos = Arrays.binarySearch(txns, txnId);
CommandsForKey result;
if (pos < 0)
{
pos = -1 - pos;
if (newStatus.hasInfo) result = insert(pos, txnId, newStatus, next);
else result = insert(pos, txnId, TxnInfo.create(txnId, newStatus, txnId));
}
else
{
// update
TxnInfo cur = txns[pos];
if (hasPrev)
{
if (newStatus.compareTo(cur.status) <= 0)
{
// we can redundantly update the same transaction via notifyWaitingOnCommit since updates to CFK may be asynchronous
// (particularly for invalidations). So we should expect that we might already represent the latest information for this transaction.
// TODO (desired): consider only accepting this for Invalidation
// TODO (desired): also clean-up special casing for AcceptedInvalidate, which exists because it currently has no effect on the CFK state
// so it could be any of Transitively Known, Historic, PreAccept or Accept
Invariants.checkState(cur.status == newStatus || next.status() == Status.AcceptedInvalidate);
if (!newStatus.hasInfo || next.acceptedOrCommitted().equals(prev.acceptedOrCommitted()))
return this;
}
InternalStatus prevStatus = prev == null ? null : InternalStatus.from(prev.saveStatus());
Invariants.checkState(cur.status.expectMatch == prevStatus || cur.status.expectMatch == null && !prev.keysOrRanges().contains(key) || (prev != null && prev.status() == Status.AcceptedInvalidate));
}
else if (newStatus.compareTo(cur.status) <= 0)
{
Invariants.checkState(cur.status == newStatus || next.status() == Status.AcceptedInvalidate);
if (!newStatus.hasInfo)
return this;
}
if (newStatus.hasInfo) result = update(pos, txnId, cur, newStatus, next);
else result = update(pos, txnId, cur, TxnInfo.create(txnId, newStatus, txnId));
}
return result;
}
public static boolean needsUpdate(Command prev, Command updated)
{
if (!updated.txnId().kind().isGloballyVisible())
return false;
SaveStatus prevStatus;
Ballot prevAcceptedOrCommitted;
if (prev == null)
{
prevStatus = SaveStatus.NotDefined;
prevAcceptedOrCommitted = Ballot.ZERO;
}
else
{
prevStatus = prev.saveStatus();
prevAcceptedOrCommitted = prev.acceptedOrCommitted();
}
return needsUpdate(prevStatus, prevAcceptedOrCommitted, updated.saveStatus(), updated.acceptedOrCommitted());
}
public static boolean needsUpdate(SaveStatus prevStatus, Ballot prevAcceptedOrCommitted, SaveStatus updatedStatus, Ballot updatedAcceptedOrCommitted)
{
InternalStatus prev = InternalStatus.from(prevStatus);
InternalStatus updated = InternalStatus.from(updatedStatus);
return updated != prev || (updated != null && updated.hasInfo && !prevAcceptedOrCommitted.equals(updatedAcceptedOrCommitted));
}
private CommandsForKey insert(int insertPos, TxnId insertTxnId, InternalStatus newStatus, Command command)
{
Object newInfo = computeInsert(insertPos, insertTxnId, newStatus, command);
if (newInfo.getClass() != InfoWithAdditions.class)
return insert(insertPos, insertTxnId, (TxnInfo)newInfo);
InfoWithAdditions newInfoWithAdditions = (InfoWithAdditions) newInfo;
TxnInfo[] newTxns = new TxnInfo[txns.length + newInfoWithAdditions.additionCount + 1];
insertWithAdditions(insertPos, insertTxnId, newInfoWithAdditions, newTxns);
return update(newTxns);
}
private CommandsForKey update(int updatePos, TxnId plainTxnId, TxnInfo prevInfo, InternalStatus newStatus, Command command)
{
Object newInfo = computeUpdate(updatePos, plainTxnId, newStatus, command);
if (newInfo.getClass() != InfoWithAdditions.class)
return update(updatePos, plainTxnId, prevInfo, (TxnInfo)newInfo);
InfoWithAdditions newInfoWithAdditions = (InfoWithAdditions) newInfo;
TxnInfo[] newTxns = new TxnInfo[txns.length + newInfoWithAdditions.additionCount];
updateWithAdditions(updatePos, plainTxnId, newInfoWithAdditions, newTxns);
if (prevInfo.status.compareTo(COMMITTED) < 0 && newStatus.compareTo(COMMITTED) >= 0)
removeMissing(newTxns, plainTxnId);
return update(newTxns);
}
private void updateWithAdditions(int updatePos, TxnId updateTxnId, InfoWithAdditions withInfo, TxnInfo[] newInfos)
{
updateOrInsertWithAdditions(updatePos, updatePos, updateTxnId, withInfo, newInfos);
}
private void insertWithAdditions(int pos, TxnId updateTxnId, InfoWithAdditions withInfo, TxnInfo[] newInfos)
{
updateOrInsertWithAdditions(pos, -1, updateTxnId, withInfo, newInfos);
}
private void updateOrInsertWithAdditions(int sourceInsertPos, int sourceUpdatePos, TxnId updatePlainTxnId, InfoWithAdditions withInfo, TxnInfo[] newTxns)
{
TxnId[] additions = withInfo.additions;
int additionCount = withInfo.additionCount;
int additionInsertPos = Arrays.binarySearch(additions, 0, additionCount, updatePlainTxnId);
additionInsertPos = Invariants.checkArgument(-1 - additionInsertPos, additionInsertPos < 0);
int targetInsertPos = sourceInsertPos + additionInsertPos;
// additions plus the updateTxnId when necessary
TxnId[] missingSource = additions;
boolean insertSelfMissing = sourceUpdatePos < 0 && withInfo.info.status.compareTo(COMMITTED) < 0;
// the most recently constructed pure insert missing array, so that it may be reused if possible
int i = 0, j = 0, missingCount = 0, missingLimit = additionCount, count = 0;
while (i < txns.length)
{
if (count == targetInsertPos)
{
newTxns[count] = withInfo.info;
if (i == sourceUpdatePos) ++i;
else if (insertSelfMissing) ++missingCount;
++count;
continue;
}
int c = j == additionCount ? -1 : txns[i].compareTo(additions[j]);
if (c < 0)
{
TxnInfo txn = txns[i];
if (i == sourceUpdatePos)
{
txn = withInfo.info;
}
else if (txn.status.hasDeps())
{
Timestamp depsKnownBefore = txn.depsKnownBefore();
if (insertSelfMissing && missingSource == additions && (missingCount != j || (depsKnownBefore != txn && depsKnownBefore.compareTo(updatePlainTxnId) > 0)))
{
missingSource = insertMissing(additions, additionCount, updatePlainTxnId, additionInsertPos);
++missingLimit;
}
int to = to(txn, depsKnownBefore, missingSource, missingCount, missingLimit);
if (to > 0)
{
TxnId[] prevMissing = txn.missing();
TxnId[] newMissing = mergeAndFilterMissing(txn, prevMissing, missingSource, to);
if (newMissing != prevMissing)
txn = txn.update(newMissing);
}
}
newTxns[count] = txn;
i++;
}
else if (c > 0)
{
TxnId txnId = additions[j++];
newTxns[count] = TxnInfo.create(txnId, TRANSITIVELY_KNOWN, txnId);
++missingCount;
}
else
{
throw illegalState(txns[i] + " should be an insertion, but found match when merging with origin");
}
count++;
}
if (j < additionCount)
{
if (count <= targetInsertPos)
{
while (count < targetInsertPos)
{
TxnId txnId = additions[j++];
newTxns[count++] = TxnInfo.create(txnId, TRANSITIVELY_KNOWN, txnId);
}
newTxns[targetInsertPos] = withInfo.info;
count = targetInsertPos + 1;
}
while (j < additionCount)
{
TxnId txnId = additions[j++];
newTxns[count++] = TxnInfo.create(txnId, TRANSITIVELY_KNOWN, txnId);
}
}
else if (count == targetInsertPos)
{
newTxns[targetInsertPos] = withInfo.info;
}
cachedTxnIds().forceDiscard(additions, additionCount);
}
private CommandsForKey update(int pos, TxnId plainTxnId, TxnInfo curInfo, TxnInfo newInfo)
{
if (curInfo == newInfo)
return this;
TxnInfo[] newTxns = txns.clone();
newTxns[pos] = newInfo;
if (curInfo.status.compareTo(COMMITTED) < 0 && newInfo.status.compareTo(COMMITTED) >= 0)
removeMissing(newTxns, plainTxnId);
return update(newTxns);
}
/**
* Insert a new txnId and info
*/
private CommandsForKey insert(int pos, TxnId insertPlainTxnId, TxnInfo insert)
{
TxnInfo[] newTxns = new TxnInfo[txns.length + 1];
if (insert.status.compareTo(COMMITTED) >= 0)
{
System.arraycopy(txns, 0, newTxns, 0, pos);
newTxns[pos] = insert;
System.arraycopy(txns, pos, newTxns, pos + 1, txns.length - pos);
}
else
{
insertInfoAndOneMissing(pos, insertPlainTxnId, insert, txns, newTxns, 1);
}
return update(newTxns);
}
/**
* Insert a new txnId and info, then insert the txnId into the missing collection of any command that should have already caused us to witness it
*/
private static void insertInfoAndOneMissing(int insertPos, TxnId insertedPlainTxnId, TxnInfo inserted, TxnInfo[] oldTxns, TxnInfo[] newTxns, int offsetAfterInsertPos)
{
TxnId[] oneMissing = null;
for (int i = 0 ; i < insertPos ; ++i)
{
TxnInfo txn = oldTxns[i];
if (txn.status.hasDeps())
{
Timestamp depsKnownBefore = txn.depsKnownBefore();
if (depsKnownBefore != null && depsKnownBefore.compareTo(inserted) > 0 && txn.kind().witnesses(inserted))
{
TxnId[] missing = txn.missing();
if (missing == NO_TXNIDS)
missing = oneMissing = ensureOneMissing(insertedPlainTxnId, oneMissing);
else
missing = SortedArrays.insert(missing, insertedPlainTxnId, TxnId[]::new);
txn = txn.update(missing);
}
}
newTxns[i] = txn;
}
newTxns[insertPos] = inserted;
for (int i = insertPos; i < oldTxns.length ; ++i)
{
int newIndex = i + offsetAfterInsertPos;
TxnInfo txn = oldTxns[i];
if (txn.status.hasDeps())
{
if (txn.kind().witnesses(inserted))
{
TxnId[] missing = txn.missing();
if (missing == NO_TXNIDS)
missing = oneMissing = ensureOneMissing(insertedPlainTxnId, oneMissing);
else
missing = SortedArrays.insert(missing, insertedPlainTxnId, TxnId[]::new);
txn = txn.update(missing);
}
}
newTxns[newIndex] = txn;
}
}
private static void removeMissing(TxnInfo[] txns, TxnId removeTxnId)
{
for (int i = 0 ; i < txns.length ; ++i)
{
TxnInfo txn = txns[i];
TxnId[] missing = txn.missing();
if (missing == NO_TXNIDS) continue;
int j = Arrays.binarySearch(missing, removeTxnId);
if (j < 0) continue;
if (missing.length == 1)
{
missing = NO_TXNIDS;
}
else
{
int length = missing.length;
TxnId[] newMissing = new TxnId[length - 1];
System.arraycopy(missing, 0, newMissing, 0, j);
System.arraycopy(missing, j + 1, newMissing, j, length - (1 + j));
missing = newMissing;
}
txns[i] = txn.update(missing);
}
}
private static TxnId[] ensureOneMissing(TxnId txnId, TxnId[] oneMissing)
{
return oneMissing != null ? oneMissing : new TxnId[] { txnId };
}
private static TxnId[] insertMissing(TxnId[] additions, int additionCount, TxnId updateTxnId, int additionInsertPos)
{
TxnId[] result = new TxnId[additionCount + 1];
System.arraycopy(additions, 0, result, 0, additionInsertPos);
System.arraycopy(additions, additionInsertPos, result, additionInsertPos + 1, additionCount - additionInsertPos);
result[additionInsertPos] = updateTxnId;
return result;
}
private static TxnId[] mergeAndFilterMissing(TxnId owner, TxnId[] current, TxnId[] additions, int additionCount)
{
Kinds kinds = owner.kind().witnesses();
int additionLength = additionCount;
for (int i = additionCount - 1 ; i >= 0 ; --i)
{
if (!kinds.test(additions[i].kind()))
--additionCount;
}
if (additionCount == 0)
return current;
TxnId[] buffer = cachedTxnIds().get(current.length + additionCount);
int i = 0, j = 0, count = 0;
while (i < additionLength && j < current.length)
{
if (kinds.test(additions[i].kind()))
{
int c = additions[i].compareTo(current[j]);
if (c < 0) buffer[count++] = additions[i++];
else buffer[count++] = current[j++];
}
else i++;
}
while (i < additionLength)
{
if (kinds.test(additions[i].kind()))
buffer[count++] = additions[i];
i++;
}
while (j < current.length)
{
buffer[count++] = current[j++];
}
Invariants.checkState(count == additionCount + current.length);
return cachedTxnIds().completeAndDiscard(buffer, count);
}
private static int to(TxnId txnId, Timestamp depsKnownBefore, TxnId[] missingSource, int missingCount, int missingLimit)
{
if (depsKnownBefore == txnId) return missingCount;
int to = Arrays.binarySearch(missingSource, 0, missingLimit, depsKnownBefore);
if (to < 0) to = -1 - to;
return to;
}
static class InfoWithAdditions
{
final TxnInfo info;
final TxnId[] additions;
final int additionCount;
InfoWithAdditions(TxnInfo info, TxnId[] additions, int additionCount)
{
this.info = info;
this.additions = additions;
this.additionCount = additionCount;
}
}
private Object computeInsert(int insertPos, TxnId txnId, InternalStatus newStatus, Command command)
{
return computeInfoAndAdditions(insertPos, -1, txnId, newStatus, command);
}
private Object computeUpdate(int updatePos, TxnId txnId, InternalStatus newStatus, Command command)
{
return computeInfoAndAdditions(updatePos, updatePos, txnId, newStatus, command);
}
private Object computeInfoAndAdditions(int insertPos, int updatePos, TxnId txnId, InternalStatus newStatus, Command command)
{
Timestamp executeAt = txnId;
if (newStatus.hasInfo)
{
executeAt = command.executeAt();
if (executeAt.equals(txnId)) executeAt = txnId;
}
Timestamp depsKnownBefore = newStatus.depsKnownBefore(txnId, executeAt);
return computeInfoAndAdditions(insertPos, updatePos, txnId, newStatus, executeAt, depsKnownBefore, command.partialDeps().keyDeps.txnIds(key));
}
private Object computeInfoAndAdditions(int insertPos, int updatePos, TxnId plainTxnId, InternalStatus newStatus, Timestamp executeAt, Timestamp depsKnownBefore, SortedList<TxnId> deps)
{
int depsKnownBeforePos;
if (depsKnownBefore == plainTxnId)
{
depsKnownBeforePos = insertPos;
}
else
{
depsKnownBeforePos = Arrays.binarySearch(txns, insertPos, txns.length, depsKnownBefore);
Invariants.checkState(depsKnownBeforePos < 0);
depsKnownBeforePos = -1 - depsKnownBeforePos;
}
TxnId[] additions = NO_TXNIDS, missing = NO_TXNIDS;
int additionCount = 0, missingCount = 0;
int depsIndex = deps.find(redundantBefore.shardRedundantBefore());
if (depsIndex < 0) depsIndex = -1 - depsIndex;
int txnIdsIndex = 0;
while (txnIdsIndex < depsKnownBeforePos && depsIndex < deps.size())
{
TxnInfo t = txns[txnIdsIndex];
TxnId d = deps.get(depsIndex);
int c = t.compareTo(d);
if (c == 0)
{
++txnIdsIndex;
++depsIndex;
}
else if (c < 0)
{
// we expect to be missing ourselves
// we also permit any transaction we have recorded as COMMITTED or later to be missing, as recovery will not need to consult our information
if (txnIdsIndex != updatePos && t.status.compareTo(COMMITTED) < 0 && plainTxnId.kind().witnesses(t))
{
if (missingCount == missing.length)
missing = cachedTxnIds().resize(missing, missingCount, Math.max(8, missingCount * 2));
missing[missingCount++] = t.asPlainTxnId();
}
txnIdsIndex++;
}
else
{
if (additionCount >= additions.length)
additions = cachedTxnIds().resize(additions, additionCount, Math.max(8, additionCount * 2));
additions[additionCount++] = d;
depsIndex++;
}
}
while (txnIdsIndex < depsKnownBeforePos)
{
if (txnIdsIndex != updatePos && txns[txnIdsIndex].status.compareTo(COMMITTED) < 0)
{
TxnId txnId = txns[txnIdsIndex].asPlainTxnId();
if ((plainTxnId.kind().witnesses(txnId)))
{
if (missingCount == missing.length)
missing = cachedTxnIds().resize(missing, missingCount, Math.max(8, missingCount * 2));
missing[missingCount++] = txnId;
}
}
txnIdsIndex++;
}
while (depsIndex < deps.size())
{
if (additionCount >= additions.length)
additions = cachedTxnIds().resize(additions, additionCount, Math.max(8, additionCount * 2));
additions[additionCount++] = deps.get(depsIndex++);
}
TxnInfo info = TxnInfo.create(plainTxnId, newStatus, executeAt, cachedTxnIds().completeAndDiscard(missing, missingCount));
if (additionCount == 0)
return info;
return new InfoWithAdditions(info, additions, additionCount);
}
private CommandsForKey update(TxnInfo[] newTxns)
{
return new CommandsForKey(key, redundantBefore, newTxns, unmanageds);
}
// directly notify the progress log of any command we're now awaiting the coordination of (as the lowest TxnId not committed)
public CommandsForKey notifyAndUpdatePending(SafeCommandStore safeStore, Command command, CommandsForKey prevCfk)
{
return notifyAndUpdatePending(safeStore, command.txnId(), from(command.saveStatus()), command.executeAt(), prevCfk);
}
public CommandsForKey notifyAndUpdatePending(SafeCommandStore safeStore, TxnId updatedTxnId, InternalStatus newStatus, Timestamp newExecuteAt, CommandsForKey prevCfk)
{
TxnInfo prevInfo = prevCfk.get(updatedTxnId);
switch (newStatus)
{
default: throw new AssertionError("Unhandled InternalStatus: " + newStatus);
case TRANSITIVELY_KNOWN:
case PREACCEPTED:
case HISTORICAL:
case ACCEPTED:
break;
case STABLE:
case COMMITTED:
int compareWithNext = nextWrite == null ? -1 : newExecuteAt.compareTo(nextWrite.executeAt);
if (compareWithNext <= 0)
{
if (newStatus == COMMITTED)
break;
// the updated transaction itself might be executable
notify(safeStore, AnyGloballyVisible, next == null ? Timestamp.NONE : next.executeAt, newExecuteAt);
}
else
{
// somebody waiting on us might now be ready to execute, but only if we execute after them and were not previously committed, but WERE previously known
// if the next command executes after us, or executes before we were started
// then our change in status cannot have triggered any change in immediate execution dependencies
if (prevInfo == null || prevInfo.status == COMMITTED || nextWrite.executeAt.compareTo(updatedTxnId) < 0 || newExecuteAt.equals(updatedTxnId))
break;
notify(safeStore, updatedTxnId.kind().witnessedBy(), next.executeAt, nextWrite.executeAt);
}
break;
case APPLIED:
case INVALID_OR_TRUNCATED:
if (next != null)
notify(safeStore, updatedTxnId.kind().witnessedBy(), next.executeAt, nextWrite != null ? nextWrite.executeAt : Timestamp.MAX);
break;
}
Unmanaged[] pending = this.unmanageds;
if (minUncommitted != null && (prevCfk.minUncommitted == null || !prevCfk.minUncommitted.equals(minUncommitted)))
notifyWaitingOnCommit(safeStore, minUncommitted);
if (newStatus.compareTo(COMMITTED) >= 0 && (prevInfo == null || prevInfo.status.compareTo(COMMITTED) < 0))
pending = notifyUnmanaged(safeStore, COMMIT, pending, minUncommitted == null ? Timestamp.MAX : minUncommitted, this);
if (minUncommitted == null || next != null)
pending = notifyUnmanaged(safeStore, APPLY, pending, next == null ? Timestamp.MAX : next.executeAt, this);
return new CommandsForKey(key, redundantBefore, txns, pending);
}
public CommandsForKey notifyAndUpdatePending(SafeCommandStore safeStore, CommandsForKey prevCfk)
{
Unmanaged[] pending = this.unmanageds;
if (minUncommitted != null && (prevCfk.minUncommitted == null || !prevCfk.minUncommitted.equals(minUncommitted)))
notifyWaitingOnCommit(safeStore, minUncommitted);
if (minUncommitted == null || next != null)
pending = notifyUnmanaged(safeStore, APPLY, pending, next == null ? Timestamp.MAX : next.executeAt, this);
return new CommandsForKey(key, redundantBefore, txns, pending);
}
private void notifyWaitingOnCommit(SafeCommandStore safeStore, TxnInfo uncommitted)
{
if (redundantBefore.endEpoch <= uncommitted.epoch())
return;
TxnId txnId = uncommitted.asPlainTxnId();
if (uncommitted.status.compareTo(PREACCEPTED) < 0)
{
Key key = this.key;
Keys keys = Keys.of(key);
safeStore = safeStore; // make unsafe for compiler to permit in lambda
safeStore.commandStore().execute(PreLoadContext.contextFor(txnId, keys), safeStore0 -> {
SafeCommand safeCommand0 = safeStore0.get(txnId);
safeCommand0.initialise();
Command command = safeCommand0.current();
Seekables<?, ?> keysOrRanges = command.keysOrRanges();
if (keysOrRanges == null || !keysOrRanges.contains(key))
{
CommonAttributes.Mutable attrs = command.mutable();
if (command.additionalKeysOrRanges() == null) attrs.additionalKeysOrRanges(keys);
else attrs.additionalKeysOrRanges(keys.with((Keys)command.additionalKeysOrRanges()));
safeCommand0.update(safeStore0, command.updateAttributes(attrs));
}
if (command.hasBeen(Status.Committed))
{
// if we're committed but not invalidated, that means EITHER we have raced with a commit+
// OR we adopted as a dependency a
safeStore0.get(key).update(safeStore0, safeCommand0.current());
}
else
{
// TODO (desired): we could complicate our state machine to replicate PreCommitted here, so we can simply wait for ReadyToExclude
safeStore0.progressLog().waiting(txnId, WaitingToExecute, null, keys.toParticipants());
}
}).begin(safeStore.agent());
}
else
{
safeStore.progressLog().waiting(txnId, WaitingToExecute, null, Keys.of(key).toParticipants());
}
}
private static Unmanaged[] notifyUnmanaged(SafeCommandStore safeStore, Unmanaged.Pending pendingStatus, Unmanaged[] pending, Timestamp timestamp, CommandsForKey cfk)
{
switch (pendingStatus)
{
default: throw new AssertionError("Unhandled PendingRangeTransaction.Pending: " + pendingStatus);
case COMMIT:
{
// notify commit uses exclusive bounds, as we use minUncommitted
int end = findPendingCommit(pending, timestamp);
if (end < 0) end = -1 - end;
Unmanaged[] addPending = new Unmanaged[end];
int addPendingCount = 0;
for (int i = 0 ; i < end ; ++i)
{
TxnId txnId = pending[i].txnId;
SafeCommand safeCommand = safeStore.ifLoadedAndInitialised(txnId);
if (safeCommand != null)
{
addPending[addPendingCount] = updatePending(safeStore, safeCommand, cfk);
if (addPending[addPendingCount] != null) ++addPendingCount;
}
else updatePendingAsync(safeStore, txnId, cfk.key);
}
Arrays.sort(addPending, 0, addPendingCount);
return SortedArrays.linearUnion(pending, end, pending.length, addPending, 0, addPendingCount, Unmanaged::compareTo, ArrayBuffers.uncached(Unmanaged[]::new));
}
case APPLY:
{
int start = -1 - SortedArrays.binarySearch(pending, 0, pending.length, timestamp, (f, v) -> v.pending == APPLY ? -1 : 1, FAST);
int end = findPendingApply(pending, timestamp);
if (end < 0) end = -1 - end;
if (start == end)
return pending;
for (int i = start; i < end ; ++i)
removeWaitingOn(safeStore, pending[i].txnId, cfk.key);
Unmanaged[] newPending = new Unmanaged[pending.length - (end - start)];
System.arraycopy(pending, 0, newPending, 0, start);
System.arraycopy(pending, end, newPending, start, newPending.length - start);
return newPending;
}
}
}
private static void updatePendingAsync(SafeCommandStore safeStore, TxnId txnId, Key key)
{
PreLoadContext context = PreLoadContext.contextFor(txnId, Keys.of(key), KeyHistory.COMMANDS);
safeStore.commandStore().execute(context, safeStore0 -> {
SafeCommandsForKey safeCommandsForKey = safeStore0.get(key);
CommandsForKey prev = safeCommandsForKey.current();
Unmanaged addPending = updatePending(safeStore0, safeStore0.get(txnId), prev);
if (addPending != null)
{
Unmanaged[] newPending = SortedArrays.insert(prev.unmanageds, addPending, Unmanaged[]::new);
safeCommandsForKey.set(new CommandsForKey(prev.key, prev.redundantBefore, prev.txns, newPending));
}
}).begin(safeStore.agent());
}
private static Unmanaged updatePending(SafeCommandStore safeStore, SafeCommand safeCommand, CommandsForKey cfk)
{
if (safeCommand.current().hasBeen(Status.Truncated))
return null;
Command.Committed command = safeCommand.current().asCommitted();
TxnId waitingTxnId = command.txnId();
Timestamp waitingExecuteAt = command.executeAt();
SortedRelationList<TxnId> txnIds = command.partialDeps().keyDeps.txnIds(cfk.key);
int i = txnIds.find(cfk.shardRedundantBefore());
if (i < 0) i = -1 - i;
if (i < txnIds.size())
{
boolean readyToExecute = true;
Timestamp executesAt = null;
int j = SortedArrays.binarySearch(cfk.txns, 0, cfk.txns.length, txnIds.get(i), Timestamp::compareTo, FAST);
if (j < 0) j = -1 -j;
while (i < txnIds.size() && j < cfk.txns.length)
{
int c = txnIds.get(i).compareTo(cfk.txns[j]);
if (c > 0) ++j;
else if (c < 0) throw illegalState("Transaction not found: " + cfk.txns[j]);
else
{
TxnInfo txn = cfk.txns[j];
if (waitingTxnId.kind().awaitsOnlyDeps() || txn.executeAt.compareTo(waitingExecuteAt) < 0)
{
readyToExecute &= txn.status.compareTo(APPLIED) >= 0;
executesAt = Timestamp.nonNullOrMax(executesAt, txn.executeAt);
}
++i;
++j;
}
}
if (!readyToExecute)
{
if (waitingTxnId.kind().awaitsOnlyDeps() && executesAt != null)
{
if (executesAt instanceof TxnInfo) executesAt = ((TxnInfo) executesAt).asPlainTxnId();
if (executesAt.compareTo(command.waitingOn.executeAtLeast(Timestamp.NONE)) > 0)
{
Command.WaitingOn.Update waitingOn = new Command.WaitingOn.Update(command.waitingOn);
waitingOn.updateExecuteAtLeast(executesAt);
safeCommand.updateWaitingOn(waitingOn);
}
}
return new Unmanaged(APPLY, command.txnId(), executesAt);
}
}
removeWaitingOn(safeStore, safeCommand, cfk.key, true);
return null;
}
private static int findPendingCommit(Unmanaged[] pending, Timestamp txnId)
{
return SortedArrays.binarySearch(pending, 0, pending.length, txnId, (f, v) -> {
if (v.pending == APPLY) return -1;
return f.compareTo(v.waitingUntil);
}, CEIL);
}
private static int findPendingApply(Unmanaged[] pending, Timestamp txnId)
{
return SortedArrays.binarySearch(pending, 0, pending.length, txnId, (f, v) -> {
if (v.pending == COMMIT) return 1;
return f.compareTo(v.waitingUntil);
}, CEIL);
}
public CommandsForKey registerUnmanaged(SafeCommandStore safeStore, SafeCommand safeCommand)
{
if (safeCommand.current().hasBeen(Status.Truncated))
return this;
Command.Committed command = safeCommand.current().asCommitted();
TxnId waitingTxnId = command.txnId();
Timestamp waitingExecuteAt = command.executeAt();
SortedRelationList<TxnId> txnIds = command.partialDeps().keyDeps.txnIds(key);
TxnId[] missing = cachedTxnIds().get(8);
int missingCount = 0;
int i = txnIds.find(shardRedundantBefore());
if (i < 0) i = -1 - i;
if (i < txnIds.size())
{
boolean readyToExecute = true;
boolean waitingToApply = true;
Timestamp executesAt = null;
int j = SortedArrays.binarySearch(txns, 0, txns.length, txnIds.get(i), Timestamp::compareTo, FAST);
if (j < 0) j = -1 -j;
while (i < txnIds.size())
{
int c = j == txns.length ? -1 : txnIds.get(i).compareTo(txns[j]);
if (c == 0)
{
TxnInfo txn = txns[j];
if (txn.status.compareTo(COMMITTED) < 0) readyToExecute = waitingToApply = false;
else if (waitingTxnId.kind().awaitsOnlyDeps() || txn.executeAt.compareTo(waitingExecuteAt) < 0)
{
readyToExecute &= txn.status.compareTo(APPLIED) >= 0;
executesAt = Timestamp.nonNullOrMax(executesAt, txn.executeAt);
}
++i;
++j;
}
else if (c > 0) ++j;
else
{
readyToExecute = waitingToApply = false;
if (missingCount == missing.length)
missing = cachedTxnIds().resize(missing, missingCount, missingCount * 2);
missing[missingCount++] = txnIds.get(i++);
}
}
if (!readyToExecute)
{
TxnInfo[] newTxns = txns;
if (missingCount > 0)
{
newTxns = new TxnInfo[missingCount + txns.length];
i = 0;
j = 0;
int count = 0;
while (i < missingCount && j < txns.length)
{
int c = missing[i].compareTo(txns[j]);
if (c < 0) newTxns[count++] = TxnInfo.create(missing[i++], TRANSITIVELY_KNOWN);
else newTxns[count++] = txns[j++];
}
while (i < missingCount)
newTxns[count++] = TxnInfo.create(missing[i++], TRANSITIVELY_KNOWN);
while (j < txns.length)
newTxns[count++] = txns[j++];
}
Unmanaged newPendingRecord;
if (waitingToApply)
{
if (waitingTxnId.kind().awaitsOnlyDeps() && executesAt != null)
{
if (executesAt.compareTo(command.waitingOn.executeAtLeast(Timestamp.NONE)) > 0)
{
Command.WaitingOn.Update waitingOn = new Command.WaitingOn.Update(command.waitingOn);
waitingOn.updateExecuteAtLeast(executesAt);
safeCommand.updateWaitingOn(waitingOn);
}
}
if (executesAt != null)
if (executesAt instanceof TxnInfo) executesAt = ((TxnInfo) executesAt).asPlainTxnId();
newPendingRecord = new Unmanaged(APPLY, command.txnId(), executesAt);
}
else newPendingRecord = new Unmanaged(COMMIT, command.txnId(), txnIds.get(txnIds.size() - 1));
Unmanaged[] newPending = SortedArrays.insert(unmanageds, newPendingRecord, Unmanaged[]::new);
return new CommandsForKey(key, redundantBefore, newTxns, newPending);
}
}
removeWaitingOn(safeStore, safeCommand, key, true);
return this;
}
// minVisitIndex is the index we guarantee to visit from contiguously, i.e. ignoring any jumps backwards due to executeAt
private void notify(SafeCommandStore safeStore, Kinds kinds, Timestamp from, Timestamp to)
{
int start = SortedArrays.binarySearch(committed, 0, committed.length, from, (f, v) -> f.compareTo(v.executeAt), FAST);
if (start < 0) start = -1 - start;
int end = SortedArrays.binarySearch(committed, start, committed.length, to, (f, v) -> f.compareTo(v.executeAt), FAST);
if (end < 0) end = -1 - end;
else ++end;
notify(safeStore, kinds, start, end);
}
private void notify(SafeCommandStore safeStore, Kinds kinds, int start, int end)
{
Participants<?> asParticipants = null;
int uncommittedIndex = minUncommitted == null ? txns.length : -1;
int unappliedReadCount = 0, unappliedWriteCount = 0, unappliedSyncPointCount = 0;
for (int i = start ; i < end ; ++i)
{
TxnInfo txn = committed[i];
Kind kind = txn.kind();
switch (txn.status)
{
case APPLIED:
case INVALID_OR_TRUNCATED:
continue;
case COMMITTED:
// cannot execute as dependencies not stable, so notify progress log to get or decide stable deps
if (kinds.test(kind))
{
if (asParticipants == null)
asParticipants = Keys.of(key).toParticipants();
TxnId txnId = new TxnId(txn);
safeStore.progressLog().waiting(txnId, WaitingToExecute, null, asParticipants);
}
break;
case STABLE:
if (uncommittedIndex < 0)
uncommittedIndex = SortedArrays.binarySearch(txns, 0, txns.length, minUncommitted, Timestamp::compareTo, FAST);
int nextUncommittedIndex = SortedArrays.exponentialSearch(txns, uncommittedIndex, txns.length, txn.executeAt, Timestamp::compareTo, FAST);
if (nextUncommittedIndex < 0) nextUncommittedIndex = -1 -nextUncommittedIndex;
while (uncommittedIndex < nextUncommittedIndex)
{
TxnInfo backfillTxn = txns[uncommittedIndex++];
if (backfillTxn.status.compareTo(COMMITTED) >= 0) continue;
Kind backfillKind = backfillTxn.kind();
switch (backfillKind)
{
default: throw new AssertionError("Unhandled Txn.Kind: " + backfillKind);
case LocalOnly:
case EphemeralRead:
throw illegalState("Invalid Txn.Kind for CommandsForKey: " + backfillKind);
case Write:
++unappliedWriteCount;
break;
case Read:
++unappliedReadCount;
break;
case SyncPoint:
case ExclusiveSyncPoint:
++unappliedSyncPointCount;
break;
}
}
int expectMissingCount = 0;
switch (txn.kind())
{
default: throw new AssertionError("Unhandled Txn.Kind: " + kind);
case LocalOnly:
case EphemeralRead:
throw illegalState("Invalid Txn.Kind for CommandsForKey: " + kind);
case SyncPoint:
case ExclusiveSyncPoint:
expectMissingCount += unappliedSyncPointCount;
case Write:
expectMissingCount += unappliedReadCount;
case Read:
expectMissingCount += unappliedWriteCount;
}
// We remove committed transactions from the missing set, since they no longer need them there
// So the missing collection represents only those uncommitted transaction ids that a transaction
// witnesses/conflicts with. So we may simply count all of those we know of with a lower TxnId,
// and if the count is the same then we are not awaiting any of them for execution and can remove
// this command's dependency on this key for execution.
TxnId[] missing = txn.missing();
int missingCount = missing.length;
if (missingCount > 0 && minUncommitted != null)
{
int missingFrom = SortedArrays.binarySearch(missing, 0, missing.length, minUncommitted, TxnId::compareTo, FAST);
if (missingFrom < 0) missingFrom = -1 - missingFrom;
missingCount -= missingFrom;
}
if (expectMissingCount == missingCount)
{
TxnId txnId = new TxnId(txn);
removeWaitingOn(safeStore, txnId, key);
if (asParticipants == null)
asParticipants = Keys.of(key).toParticipants();
safeStore.progressLog().waiting(txnId, WaitingToApply, null, asParticipants);
}
}
switch (kind)
{
default: throw new AssertionError("Unhandled Txn.Kind: " + kind);
case LocalOnly:
case EphemeralRead:
throw illegalState("Invalid Txn.Kind for CommandsForKey: " + kind);
case Write:
++unappliedWriteCount;
break;
case Read:
++unappliedReadCount;
break;
case SyncPoint:
case ExclusiveSyncPoint:
++unappliedSyncPointCount;
break;
}
}
}
private static void removeWaitingOn(SafeCommandStore safeStore, TxnId txnId, Key key)
{
SafeCommand safeCommand = safeStore.ifLoadedAndInitialised(txnId);
if (safeCommand != null) removeWaitingOn(safeStore, safeCommand, key, true);
else
{
PreLoadContext context = PreLoadContext.contextFor(txnId);
safeStore.commandStore().execute(context, safeStore0 -> removeWaitingOn(safeStore0, safeStore0.get(txnId), key, false))
.begin(safeStore.agent());
}
}
private static void removeWaitingOn(SafeCommandStore safeStore, SafeCommand safeCommand, Key key, boolean executeAsync)
{
Commands.removeWaitingOnKeyAndMaybeExecute(safeStore, safeCommand, key, executeAsync);
}
public CommandsForKey withRedundantBefore(RedundantBefore.Entry newRedundantBefore)
{
Invariants.checkArgument(newRedundantBefore.shardRedundantBefore().compareTo(shardRedundantBefore()) >= 0, "Expect new RedundantBefore.Entry shardAppliedOrInvalidatedBefore to be ahead of existing one");
if (newRedundantBefore.equals(redundantBefore))
return this;
TxnInfo[] newTxns = txns;
if (newRedundantBefore.shardRedundantBefore().compareTo(shardRedundantBefore()) >= 0)
{
int pos = insertPos(0, newRedundantBefore.shardRedundantBefore());
if (pos != 0)
{
newTxns = Arrays.copyOfRange(txns, pos, txns.length);
for (int i = 0 ; i < newTxns.length ; ++i)
{
TxnInfo txn = newTxns[i];
TxnId[] missing = txn.missing();
if (missing == NO_TXNIDS) continue;
int j = Arrays.binarySearch(missing, newRedundantBefore.shardRedundantBefore());
if (j < 0) j = -1 - j;
if (j <= 0) continue;
missing = j == missing.length ? NO_TXNIDS : Arrays.copyOfRange(missing, j, missing.length);
newTxns[i] = txn.update(missing);
}
}
}
// TODO (expected): filter pending unmanageds
return new CommandsForKey(key, newRedundantBefore, newTxns, unmanageds);
}
public CommandsForKey registerHistorical(TxnId txnId)
{
if (txnId.compareTo(shardRedundantBefore()) < 0)
return this;
int i = Arrays.binarySearch(txns, txnId);
if (i >= 0)
return txns[i].status.compareTo(HISTORICAL) >= 0 ? this : update(i, txnId, txns[i], TxnInfo.create(txnId, HISTORICAL, txnId));
return insert(-1 - i, txnId, TxnInfo.create(txnId, HISTORICAL, txnId));
}
private int insertPos(int min, Timestamp timestamp)
{
int i = Arrays.binarySearch(txns, min, txns.length, timestamp);
if (i < 0) i = -1 -i;
return i;
}
public TxnId findFirst()
{
return txns.length > 0 ? txns[0] : null;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CommandsForKey that = (CommandsForKey) o;
return Objects.equals(key, that.key)
&& Arrays.equals(txns, that.txns);
}
@Override
public int hashCode()
{
throw new UnsupportedOperationException();
}
}