blob: 6c8b77efac30f0ddaed5f1289b4b6e9d48513848 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.local;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.ProgressLog.ProgressShard;
import accord.api.Result;
import accord.api.RoutingKey;
import accord.coordinate.Infer;
import accord.local.Command.ProxyListener;
import accord.local.Command.WaitingOn;
import accord.local.SaveStatus.LocalExecution;
import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.EpochSupplier;
import accord.primitives.FullRoute;
import accord.primitives.PartialDeps;
import accord.primitives.PartialRoute;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Route;
import accord.primitives.Seekables;
import accord.primitives.SyncPoint;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
import accord.primitives.Writes;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import static accord.api.ProgressLog.ProgressShard.Home;
import static accord.api.ProgressLog.ProgressShard.Local;
import static accord.api.ProgressLog.ProgressShard.No;
import static accord.api.ProgressLog.ProgressShard.UnmanagedHome;
import static accord.api.ProgressLog.ProgressShard.Unsure;
import static accord.local.Cleanup.ERASE;
import static accord.local.Cleanup.shouldCleanup;
import static accord.local.Command.Truncated.erased;
import static accord.local.Command.Truncated.truncatedApply;
import static accord.local.Command.Truncated.truncatedApplyWithOutcome;
import static accord.local.Commands.EnsureAction.Add;
import static accord.local.Commands.EnsureAction.Ignore;
import static accord.local.Commands.EnsureAction.Set;
import static accord.local.Commands.EnsureAction.TryAdd;
import static accord.local.Commands.EnsureAction.TrySet;
import static accord.local.KeyHistory.TIMESTAMPS;
import static accord.local.PreLoadContext.contextFor;
import static accord.local.RedundantStatus.PRE_BOOTSTRAP_OR_STALE;
import static accord.local.SaveStatus.Applying;
import static accord.local.SaveStatus.Erased;
import static accord.local.SaveStatus.LocalExecution.ReadyToExclude;
import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
import static accord.local.SaveStatus.TruncatedApply;
import static accord.local.Status.Accepted;
import static accord.local.Status.AcceptedInvalidate;
import static accord.local.Status.Applied;
import static accord.local.Status.Committed;
import static accord.local.Status.Durability;
import static accord.local.Status.Invalidated;
import static accord.local.Status.KnownExecuteAt.ExecuteAtKnown;
import static accord.local.Status.NotDefined;
import static accord.local.Status.PreAccepted;
import static accord.local.Status.PreApplied;
import static accord.local.Status.PreCommitted;
import static accord.local.Status.ReadyToExecute;
import static accord.local.Status.Stable;
import static accord.local.Status.Truncated;
import static accord.primitives.Routables.Slice.Minimal;
import static accord.primitives.Route.isFullRoute;
import static accord.primitives.Txn.Kind.EphemeralRead;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static accord.utils.Invariants.illegalState;
public class Commands
{
private static final Logger logger = LoggerFactory.getLogger(Commands.class);
private Commands()
{
}
private static Ranges covers(@Nullable PartialTxn txn)
{
return txn == null ? null : txn.covering();
}
private static Ranges covers(@Nullable PartialDeps deps)
{
return deps == null ? null : deps.covering;
}
private static boolean hasQuery(PartialTxn txn)
{
return txn != null && txn.query() != null;
}
/**
* true iff this commandStore owns the given key on the given epoch
*/
public static boolean owns(SafeCommandStore safeStore, long epoch, RoutingKey someKey)
{
return safeStore.ranges().allAt(epoch).contains(someKey);
}
public enum AcceptOutcome { Success, Redundant, RejectedBallot, Truncated }
public static AcceptOutcome preaccept(SafeCommandStore safeStore, SafeCommand safeCommand, TxnId txnId, long acceptEpoch, PartialTxn partialTxn, FullRoute<?> route, @Nullable RoutingKey progressKey)
{
return preacceptOrRecover(safeStore, safeCommand, txnId, acceptEpoch, partialTxn, route, progressKey, Ballot.ZERO);
}
public static AcceptOutcome recover(SafeCommandStore safeStore, SafeCommand safeCommand, TxnId txnId, PartialTxn partialTxn, FullRoute<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
{
// for recovery we only ever propose either the original epoch or an Accept that we witness; otherwise we invalidate
return preacceptOrRecover(safeStore, safeCommand, txnId, txnId.epoch(), partialTxn, route, progressKey, ballot);
}
private static AcceptOutcome preacceptOrRecover(SafeCommandStore safeStore, SafeCommand safeCommand, TxnId txnId, long acceptEpoch, PartialTxn partialTxn, FullRoute<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
{
Command command = safeCommand.current();
if (command.hasBeen(Truncated))
{
logger.trace("{}: skipping preaccept - command is truncated", txnId);
return command.is(Invalidated) ? AcceptOutcome.RejectedBallot : AcceptOutcome.Truncated;
}
int compareBallots = command.promised().compareTo(ballot);
if (compareBallots > 0)
{
logger.trace("{}: skipping preaccept - higher ballot witnessed ({})", txnId, command.promised());
return AcceptOutcome.RejectedBallot;
}
if (command.known().definition.isKnown())
{
Invariants.checkState(command.status() == Invalidated || command.executeAt() != null);
logger.trace("{}: skipping preaccept - already known ({})", txnId, command.status());
// in case of Ballot.ZERO, we must either have a competing recovery coordinator or have late delivery of the
// preaccept; in the former case we should abandon coordination, and in the latter we have already completed
safeCommand.updatePromised(ballot);
return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant : AcceptOutcome.Success;
}
Ranges preacceptRanges = preacceptRanges(safeStore, txnId, acceptEpoch);
Invariants.checkState(!preacceptRanges.isEmpty());
ProgressShard shard = progressShard(route, progressKey, preacceptRanges);
Invariants.checkState(validate(command.status(), command, Ranges.EMPTY, preacceptRanges, shard, route, Set, partialTxn, Set, null, Ignore));
// FIXME: this should go into a consumer method
CommonAttributes attrs = set(command, Ranges.EMPTY, preacceptRanges, shard, route, partialTxn, Set, null, Ignore);
if (command.executeAt() == null)
{
// unlike in the Accord paper, we partition shards within a node, so that to ensure a total order we must either:
// - use a global logical clock to issue new timestamps; or
// - assign each shard _and_ process a unique id, and use both as components of the timestamp
// if we are performing recovery (i.e. non-zero ballot), do not permit a fast path decision as we want to
// invalidate any transactions that were not completed by their initial coordinator
// TODO (desired): limit preaccept to keys we include, to avoid inflating unnecessary state
Timestamp executeAt = safeStore.commandStore().preaccept(txnId, attrs.partialTxn().keys(), safeStore, ballot.equals(Ballot.ZERO));
command = safeCommand.preaccept(safeStore, attrs, executeAt, ballot);
safeStore.progressLog().preaccepted(command, shard);
}
else
{
// TODO (expected, ?): in the case that we are pre-committed but had not been preaccepted/accepted, should we inform progressLog?
safeCommand.markDefined(safeStore, attrs, ballot);
}
safeStore.notifyListeners(safeCommand);
return AcceptOutcome.Success;
}
public static boolean preacceptInvalidate(SafeCommand safeCommand, Ballot ballot)
{
Command command = safeCommand.current();
if (command.hasBeen(Status.Committed))
{
if (command.is(Truncated)) logger.trace("{}: skipping preacceptInvalidate - already truncated", command.txnId());
else if (command.is(Invalidated)) logger.trace("{}: skipping preacceptInvalidate - already invalidated", command.txnId());
else logger.trace("{}: skipping preacceptInvalidate - already committed", command.txnId());
return false;
}
if (command.promised().compareTo(ballot) > 0)
{
logger.trace("{}: skipping preacceptInvalidate - witnessed higher ballot ({})", command.txnId(), command.promised());
return false;
}
safeCommand.updatePromised(ballot);
return true;
}
public static AcceptOutcome accept(SafeCommandStore safeStore, TxnId txnId, Ballot ballot, PartialRoute<?> route, Seekables<?, ?> keysOrRanges, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps)
{
SafeCommand safeCommand = safeStore.get(txnId, executeAt, route);
Command command = safeCommand.current();
if (command.hasBeen(PreCommitted))
{
if (command.is(Invalidated))
{
logger.trace("{}: skipping accept - command is invalidated", txnId);
return AcceptOutcome.RejectedBallot;
}
if (command.is(Truncated))
{
logger.trace("{}: skipping accept - command is truncated", txnId);
return AcceptOutcome.Truncated;
}
logger.trace("{}: skipping accept - already committed ({})", txnId, command.status());
return AcceptOutcome.Redundant;
}
if (command.promised().compareTo(ballot) > 0)
{
logger.trace("{}: skipping accept - witnessed higher ballot ({} > {})", txnId, command.promised(), ballot);
return AcceptOutcome.RejectedBallot;
}
Ranges coordinateRanges = coordinateRanges(safeStore, txnId);
Ranges acceptRanges = acceptRanges(safeStore, txnId, executeAt, coordinateRanges);
Invariants.checkState(!acceptRanges.isEmpty());
ProgressShard shard = progressShard(route, progressKey, coordinateRanges);
Invariants.checkState(validate(command.status(), command, coordinateRanges, acceptRanges, shard, route, Ignore, null, Ignore, partialDeps, Set));
// TODO (desired, clarity/efficiency): we don't need to set the route here, and perhaps we don't even need to
// distributed partialDeps at all, since all we gain is not waiting for these transactions to commit during
// recovery. We probably don't want to directly persist a Route in any other circumstances, either, to ease persistence.
CommonAttributes attrs = set(command, coordinateRanges, acceptRanges, shard, route, null, Ignore, partialDeps, Set);
command = safeCommand.accept(safeStore, keysOrRanges, attrs, executeAt, ballot);
safeStore.progressLog().accepted(command, shard);
safeStore.notifyListeners(safeCommand);
return AcceptOutcome.Success;
}
public static AcceptOutcome acceptInvalidate(SafeCommandStore safeStore, SafeCommand safeCommand, Ballot ballot)
{
// TODO (expected): save some partial route we can use to determine if it can be GC'd
Command command = safeCommand.current();
if (command.hasBeen(PreCommitted))
{
if (command.is(Invalidated))
{
logger.trace("{}: skipping accept invalidated - already invalidated ({})", command.txnId(), command.status());
return AcceptOutcome.Redundant;
}
if (command.is(Truncated))
{
logger.trace("{}: skipping accept invalidated - already truncated ({})", command.txnId(), command.status());
return AcceptOutcome.Truncated;
}
logger.trace("{}: skipping accept invalidated - already committed ({})", command.txnId(), command.status());
return AcceptOutcome.RejectedBallot;
}
if (command.promised().compareTo(ballot) > 0)
{
logger.trace("{}: skipping accept invalidated - witnessed higher ballot ({} > {})", command.txnId(), command.promised(), ballot);
return AcceptOutcome.RejectedBallot;
}
logger.trace("{}: accepted invalidated", command.txnId());
safeCommand.acceptInvalidated(safeStore, ballot);
safeStore.notifyListeners(safeCommand);
return AcceptOutcome.Success;
}
public enum CommitOutcome { Success, Rejected, Redundant, Insufficient }
// relies on mutual exclusion for each key
public static CommitOutcome commit(SafeCommandStore safeStore, SafeCommand safeCommand, SaveStatus newStatus, Ballot ballot, TxnId txnId, Route<?> route, @Nullable RoutingKey progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps partialDeps)
{
Command command = safeCommand.current();
SaveStatus curStatus = command.saveStatus();
Invariants.checkArgument(newStatus == SaveStatus.Committed || newStatus == SaveStatus.Stable);
if (newStatus == SaveStatus.Committed && ballot.compareTo(command.promised()) < 0)
return CommitOutcome.Rejected;
if (curStatus.hasBeen(PreCommitted))
{
if (!curStatus.is(Truncated))
{
if (!executeAt.equals(command.executeAt()) || curStatus.status == Invalidated)
safeStore.agent().onInconsistentTimestamp(command, (curStatus.status == Invalidated ? Timestamp.NONE : command.executeAt()), executeAt);
}
if (curStatus.compareTo(newStatus) > 0 || curStatus.hasBeen(Stable))
{
logger.trace("{}: skipping commit - already newer or stable ({})", txnId, command.status());
return CommitOutcome.Redundant;
}
if (curStatus == SaveStatus.Committed && newStatus == SaveStatus.Committed)
{
if (ballot.equals(command.acceptedOrCommitted()))
return CommitOutcome.Redundant;
Invariants.checkState(ballot.compareTo(command.acceptedOrCommitted()) > 0);
}
}
Ranges coordinateRanges = coordinateRanges(safeStore, txnId);
Ranges acceptRanges = acceptRanges(safeStore, txnId, executeAt, coordinateRanges);
ProgressShard shard = progressShard(route, progressKey, coordinateRanges);
if (!validate(command.status(), command, coordinateRanges, acceptRanges, shard, route, Add, partialTxn, Add, partialDeps, Set))
return CommitOutcome.Insufficient;
// FIXME: split up set
CommonAttributes attrs = set(command, coordinateRanges, acceptRanges, shard, route, partialTxn, Add, partialDeps, Set);
logger.trace("{}: committed with executeAt: {}, deps: {}", txnId, executeAt, partialDeps);
if (newStatus == SaveStatus.Stable)
{
WaitingOn waitingOn = initialiseWaitingOn(safeStore, txnId, executeAt, attrs.partialDeps(), attrs.route());
command = safeCommand.stable(safeStore, attrs, Ballot.max(command.acceptedOrCommitted(), ballot), executeAt, waitingOn);
safeStore.progressLog().stable(command, shard);
safeStore.agent().metricsEventsListener().onStable(command);
// TODO (expected, safety): introduce intermediate status to avoid reentry when notifying listeners (which might notify us)
maybeExecute(safeStore, safeCommand, true, true);
}
else
{
Invariants.checkArgument(command.acceptedOrCommitted().compareTo(ballot) <= 0);
command = safeCommand.commit(safeStore, attrs, ballot, executeAt);
safeStore.progressLog().precommitted(command);
safeStore.notifyListeners(safeCommand);
safeStore.agent().metricsEventsListener().onCommitted(command);
}
return CommitOutcome.Success;
}
// relies on mutual exclusion for each key
public static CommitOutcome precommit(SafeCommandStore safeStore, SafeCommand safeCommand, TxnId txnId, Timestamp executeAt, Route<?> route)
{
Command command = safeCommand.current();
if (command.hasBeen(PreCommitted))
{
if (command.is(Truncated))
{
logger.trace("{}: skipping commit - already truncated ({})", txnId, command.status());
return CommitOutcome.Redundant;
}
else
{
logger.trace("{}: skipping precommit - already committed ({})", txnId, command.status());
if (executeAt.equals(command.executeAt()) && command.status() != Invalidated)
return CommitOutcome.Redundant;
safeStore.agent().onInconsistentTimestamp(command, (command.status() == Invalidated ? Timestamp.NONE : command.executeAt()), executeAt);
}
}
CommonAttributes attrs = command;
if (command.route() == null || !command.route().kind().isFullRoute())
attrs = updateRoute(command, route);
safeCommand.precommit(safeStore, attrs, executeAt);
safeStore.progressLog().precommitted(command);
safeStore.notifyListeners(safeCommand);
logger.trace("{}: precommitted with executeAt: {}", txnId, executeAt);
return CommitOutcome.Success;
}
public static void createBootstrapCompleteMarkerTransaction(SafeCommandStore safeStore, TxnId localSyncId, SyncPoint syncPoint, Seekables<?, ?> keys)
{
SafeCommand safeCommand = safeStore.get(localSyncId);
Command command = safeCommand.current();
Invariants.checkState(!command.hasBeen(Committed));
createBootstrapCompleteMarkerTransaction(safeStore, localSyncId, keys, syncPoint.route());
}
private static void createBootstrapCompleteMarkerTransaction(SafeCommandStore safeStore, TxnId localSyncId, Seekables<?, ?> keys, Route<?> route)
{
SafeCommand safeCommand = safeStore.get(localSyncId);
Command command = safeCommand.current();
if (command.hasBeen(Stable))
return;
Ranges coordinateRanges = coordinateRanges(safeStore, localSyncId);
// TODO (desired, consider): in the case of sync points, the coordinator is unlikely to be a home shard, do we mind this? should document at least
Txn emptyTxn = safeStore.agent().emptyTxn(localSyncId.kind(), keys);
ProgressShard progressShard = coordinateRanges.contains(route.homeKey()) ? UnmanagedHome : No;
PartialDeps none = Deps.NONE.slice(coordinateRanges);
PartialTxn partialTxn = emptyTxn.slice(coordinateRanges, true);
Invariants.checkState(validate(command.status(), command, Ranges.EMPTY, coordinateRanges, progressShard, route, Set, partialTxn, Set, none, Set));
CommonAttributes newAttributes = set(command, Ranges.EMPTY, coordinateRanges, progressShard, route, partialTxn, Set, none, Set);
safeCommand.stable(safeStore, newAttributes, Ballot.ZERO, localSyncId, WaitingOn.EMPTY);
safeStore.notifyListeners(safeCommand);
}
public static void ephemeralRead(SafeCommandStore safeStore, SafeCommand safeCommand, Route<?> route, TxnId txnId, PartialTxn partialTxn, PartialDeps partialDeps, long executeAtEpoch)
{
// TODO (expected): introduce in-memory only commands
Command command = safeCommand.current();
if (command.hasBeen(Stable))
return;
// TODO (required): by creating synthetic TxnId in future epochs we may not be evictable
// but for ephemeral reads we want parallel eviction - or preferably no durability - anyway
txnId = txnId.withEpoch(executeAtEpoch);
Ranges coordinateRanges = coordinateRanges(safeStore, txnId);
// TODO (desired, consider): in the case of sync points, the coordinator is unlikely to be a home shard, do we mind this? should document at least
ProgressShard progressShard = No;
Invariants.checkState(validate(command.status(), command, Ranges.EMPTY, coordinateRanges, progressShard, route, Set, partialTxn, Set, partialDeps, Set));
CommonAttributes attrs = set(command, Ranges.EMPTY, coordinateRanges, progressShard, route, partialTxn, Set, partialDeps, Set);
safeCommand.stable(safeStore, attrs, Ballot.ZERO, txnId, initialiseWaitingOn(safeStore, txnId, txnId, attrs.partialDeps(), route));
maybeExecute(safeStore, safeCommand, false, true);
}
public static void markBootstrapComplete(SafeCommandStore safeStore, TxnId localSyncId, Seekables<?, ?> keys)
{
SafeCommand safeCommand = safeStore.get(localSyncId);
Command.Committed command = safeCommand.current().asCommitted();
if (command.hasBeen(PreApplied))
return;
// NOTE: if this is ever made a non-empty txn this will introduce a potential bug where the txn is registered against CommandsForKeys
Txn emptyTxn = safeStore.agent().emptyTxn(localSyncId.kind(), keys);
safeCommand.preapplied(safeStore, command, command.executeAt(), command.waitingOn(), emptyTxn.execute(localSyncId, localSyncId, null), emptyTxn.result(localSyncId, localSyncId, null));
maybeExecute(safeStore, safeCommand, true, false);
}
// TODO (expected, ?): commitInvalidate may need to update cfks _if_ possible
public static void commitInvalidate(SafeCommandStore safeStore, SafeCommand safeCommand, Unseekables<?> scope)
{
Command command = safeCommand.current();
if (command.hasBeen(PreCommitted))
{
if (command.is(Truncated))
{
logger.trace("{}: skipping commit invalidated - already truncated ({})", safeCommand.txnId(), command.status());
}
else
{
logger.trace("{}: skipping commit invalidated - already committed ({})", safeCommand.txnId(), command.status());
if (!command.is(Invalidated) && !(command.is(Truncated) && command.executeAt().equals(Timestamp.NONE)))
safeStore.agent().onInconsistentTimestamp(command, Timestamp.NONE, command.executeAt());
}
return;
}
else if (command.saveStatus().isUninitialised() && !safeStore.ranges().allAt(command.txnId().epoch()).intersects(scope))
return; // don't bother propagating the invalidation to future epochs where the replica didn't already witness the command
safeCommand.commitInvalidated(safeStore);
safeStore.progressLog().clear(command.txnId());
logger.trace("{}: committed invalidated", safeCommand.txnId());
safeStore.notifyListeners(safeCommand, command, command.durableListeners(), safeCommand.transientListeners());
}
public enum ApplyOutcome { Success, Redundant, Insufficient }
public static ApplyOutcome apply(SafeCommandStore safeStore, SafeCommand safeCommand, TxnId txnId, Route<?> route, @Nullable RoutingKey progressKey, Timestamp executeAt, @Nullable PartialDeps partialDeps, @Nullable PartialTxn partialTxn, Writes writes, Result result)
{
Command command = safeCommand.current();
if (command.hasBeen(PreApplied) && executeAt.equals(command.executeAt()))
{
logger.trace("{}: skipping apply - already executed ({})", txnId, command.status());
return ApplyOutcome.Redundant;
}
else if (command.hasBeen(PreCommitted) && !executeAt.equals(command.executeAt()))
{
if (command.is(Truncated) && command.executeAt() == null)
return ApplyOutcome.Redundant;
safeStore.agent().onInconsistentTimestamp(command, command.executeAt(), executeAt);
}
Ranges coordinateRanges = coordinateRanges(safeStore, txnId);
Ranges acceptRanges = acceptRanges(safeStore, txnId, executeAt, coordinateRanges);
ProgressShard shard = progressShard(route, progressKey, coordinateRanges);
if (!validate(command.status(), command, coordinateRanges, acceptRanges, shard, route, TryAdd, partialTxn, Add, partialDeps, command.hasBeen(Committed) ? TryAdd : TrySet, safeStore))
return ApplyOutcome.Insufficient; // TODO (expected, consider): this should probably be an assertion failure if !TrySet
CommonAttributes attrs = set(command, coordinateRanges, acceptRanges, shard, route, partialTxn, Add, partialDeps, command.hasBeen(Committed) ? TryAdd : TrySet);
WaitingOn waitingOn = !command.hasBeen(Stable) ? initialiseWaitingOn(safeStore, txnId, executeAt, attrs.partialDeps(), attrs.route()) : command.asCommitted().waitingOn();
safeCommand.preapplied(safeStore, attrs, executeAt, waitingOn, writes, result);
safeStore.notifyListeners(safeCommand);
logger.trace("{}: apply, status set to Executed with executeAt: {}, deps: {}", txnId, executeAt, partialDeps);
maybeExecute(safeStore, safeCommand, true, true);
safeStore.progressLog().executed(safeCommand.current(), shard);
safeStore.agent().metricsEventsListener().onExecuted(command);
return ApplyOutcome.Success;
}
public static void listenerUpdate(SafeCommandStore safeStore, SafeCommand safeListener, SafeCommand safeUpdated)
{
Command listener = safeListener.current();
Command updated = safeUpdated.current();
if (listener.is(NotDefined) || listener.is(Truncated))
{
// This listener must be a stale vestige
// TODO (desired): would be nice to ensure these are deregistered explicitly, but would be costly
Invariants.checkState(listener.saveStatus().isUninitialised() || listener.is(Truncated), "Listener status expected to be Uninitialised or Truncated, but was %s", listener.saveStatus());
Invariants.checkState(updated.hasBeen(Applied) || updated.is(NotDefined), "Updated status expected to be Applied or NotDefined, but was %s", updated);
safeUpdated.removeListener(listener.asListener());
return;
}
logger.trace("{}: updating as listener in response to change on {} with status {} ({})",
listener.txnId(), updated.txnId(), updated.status(), updated);
switch (updated.status())
{
default:
throw illegalState("Unexpected status: " + updated.status());
case NotDefined:
case PreAccepted:
case Accepted:
case AcceptedInvalidate:
break;
case PreCommitted:
case Committed:
case Stable:
case ReadyToExecute:
case PreApplied:
case Applied:
case Invalidated:
case Truncated:
updateDependencyAndMaybeExecute(safeStore, safeListener, safeUpdated, true);
break;
}
}
protected static void postApply(SafeCommandStore safeStore, TxnId txnId)
{
logger.trace("{} applied, setting status to Applied and notifying listeners", txnId);
SafeCommand safeCommand = safeStore.get(txnId);
safeCommand.applied(safeStore);
safeStore.notifyListeners(safeCommand);
}
/**
* The ranges for which we participate in the consensus decision of when a transaction executes
*/
private static Ranges coordinateRanges(SafeCommandStore safeStore, TxnId txnId)
{
return safeStore.ranges().coordinates(txnId);
}
/**
* The ranges for which we participate in the consensus decision of when a transaction executes
*/
private static Ranges preacceptRanges(SafeCommandStore safeStore, TxnId txnId, long untilEpoch)
{
return safeStore.ranges().allBetween(txnId.epoch(), untilEpoch);
}
private static Ranges acceptRanges(SafeCommandStore safeStore, TxnId txnId, Timestamp executeAt, Ranges coordinateRanges)
{
return safeStore.ranges().extend(coordinateRanges, txnId, executeAt);
}
private static Ranges executeRanges(SafeCommandStore safeStore, Timestamp executeAt)
{
return safeStore.ranges().allAt(executeAt.epoch());
}
/**
* The ranges for which we participate in the execution of a transaction, excluding those ranges
* for transactions below a SyncPoint where we adopted the range, and that will be obtained from peers,
* and therefore we do not want to execute locally
*/
private static Ranges applyRanges(SafeCommandStore safeStore, Timestamp executeAt)
{
return safeStore.ranges().applyRanges(executeAt);
}
private static AsyncChain<Void> applyChain(SafeCommandStore safeStore, PreLoadContext context, TxnId txnId)
{
Command.Executed command = safeStore.get(txnId).current().asExecuted();
if (command.hasBeen(Applied))
return AsyncChains.success(null);
// TODO (required): make sure we are correctly handling (esp. C* side with validation logic) executing a transaction
// that was pre-bootstrap for some range (so redundant and we may have gone ahead of), but had to be executed locally
// for another range
CommandStore unsafeStore = safeStore.commandStore();
long t0 = safeStore.time().now();
return command.writes().apply(safeStore, applyRanges(safeStore, command.executeAt()), command.partialTxn())
.flatMap(unused -> unsafeStore.submit(context, ss -> {
Command cmd = ss.get(txnId).current();
if (!cmd.hasBeen(Applied))
ss.agent().metricsEventsListener().onApplied(cmd, t0);
postApply(ss, txnId);
return null;
}));
}
private static void apply(SafeCommandStore safeStore, Command.Executed command)
{
CommandStore unsafeStore = safeStore.commandStore();
TxnId txnId = command.txnId();
// TODO (expected): there is some coupling going on here - concept of TIMESTAMPS only needed if implementation tracks on apply
PreLoadContext context = contextFor(command.txnId(), command.partialTxn().keys(), TIMESTAMPS);
// this is sometimes called from a listener update, which will not have the keys in context
if (safeStore.canExecuteWith(context))
{
applyChain(safeStore, context, txnId).begin(safeStore.agent());
}
else
{
unsafeStore.submit(context, ss -> {
applyChain(ss, context, txnId).begin(ss.agent());
return null;
}).begin(safeStore.agent());
}
}
// TODO (expected, API consistency): maybe split into maybeExecute and maybeApply?
private static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand safeCommand, boolean alwaysNotifyListeners, boolean notifyWaitingOn)
{
Command command = safeCommand.current();
if (logger.isTraceEnabled())
logger.trace("{}: Maybe executing with status {}. Will notify listeners on noop: {}", command.txnId(), command.status(), alwaysNotifyListeners);
if (command.status() != Stable && command.status() != PreApplied)
{
if (alwaysNotifyListeners)
safeStore.notifyListeners(safeCommand);
return false;
}
if (command.asCommitted().isWaitingOnDependency())
{
if (alwaysNotifyListeners)
safeStore.notifyListeners(safeCommand);
if (notifyWaitingOn)
new NotifyWaitingOn(safeCommand).accept(safeStore);
return false;
}
// TODO (required): slice our execute ranges based on any pre-bootstrap state
// FIXME: need to communicate to caller that we didn't execute if we take one of the above paths
switch (command.status())
{
case Stable:
// TODO (desirable, efficiency): maintain distinct ReadyToRead and ReadyToWrite states
command = safeCommand.readyToExecute(safeStore);
logger.trace("{}: set to ReadyToExecute", command.txnId());
safeStore.progressLog().readyToExecute(command);
safeStore.notifyListeners(safeCommand);
return true;
case PreApplied:
Ranges executeRanges = executeRanges(safeStore, command.executeAt());
Command.Executed executed = command.asExecuted();
boolean intersects = executed.writes().keys.intersects(executeRanges);
if (intersects)
{
safeCommand.applying(safeStore);
logger.trace("{}: applying", command.txnId());
apply(safeStore, executed);
return true;
}
else
{
// TODO (desirable, performance): This could be performed immediately upon Committed
// but: if we later support transitive dependency elision this could be dangerous
logger.trace("{}: applying no-op", command.txnId());
safeCommand.applied(safeStore);
if (command.txnId().kind() == ExclusiveSyncPoint)
{
Ranges ranges = safeStore.ranges().allAt(command.txnId().epoch());
ranges = command.route().slice(ranges, Minimal).participants().toRanges();
safeStore.commandStore().markExclusiveSyncPointLocallyApplied(safeStore, command.txnId(), ranges);
}
safeStore.notifyListeners(safeCommand);
return true;
}
default:
throw illegalState("Unexpected status: " + command.status());
}
}
protected static WaitingOn initialiseWaitingOn(SafeCommandStore safeStore, TxnId waitingId, Timestamp executeWaitingAt, PartialDeps deps, Route<?> route)
{
if (waitingId.kind().awaitsOnlyDeps())
executeWaitingAt = Timestamp.maxForEpoch(waitingId.epoch());
Ranges ranges = safeStore.ranges().allAt(executeWaitingAt);
Unseekables<?> executionParticipants = route.participants().slice(ranges, Minimal);
WaitingOn.Update update = new WaitingOn.Update(deps);
deps.keyDeps.forEach(ranges, 0, deps.keyDeps.txnIdCount(), update, null, (u, v, i) -> {
u.initialiseWaitingOnCommit(i);
});
// we select range deps on actual participants rather than covered ranges,
// since we may otherwise adopt false dependencies for range txns
deps.rangeDeps.forEach(executionParticipants, update, (u, i) -> {
u.initialiseWaitingOnCommit(u.deps.keyDeps.txnIdCount() + i);
});
return updateWaitingOn(safeStore, waitingId, executeWaitingAt, update, route.participants()).build();
}
protected static WaitingOn.Update updateWaitingOn(SafeCommandStore safeStore, TxnId waitingId, Timestamp executeAt, WaitingOn.Update update, Participants<?> participants)
{
CommandStore commandStore = safeStore.commandStore();
TxnId minWaitingOnTxnId = update.minWaitingOnTxnId();
if (minWaitingOnTxnId != null && commandStore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(), executeAt, participants))
safeStore.commandStore().removeRedundantDependencies(participants, update);
update.forEachWaitingOnCommit(safeStore, update, waitingId, executeAt, (safeStore0, upd, id, exec, i) -> {
// TODO (expected): load read-only to reduce overhead; upgrade only if we need to remove listener
SafeCommand dep = safeStore0.ifLoadedAndInitialised(upd.deps.txnId(i));
if (dep == null || !dep.current().hasBeen(PreCommitted))
return;
updateWaitingOn(safeStore0, id, exec, upd, dep);
});
update.forEachWaitingOnApply(safeStore, update, waitingId, executeAt, (store, upd, id, exec, i) -> {
SafeCommand dep = store.ifLoadedAndInitialised(upd.deps.txnId(i));
if (dep == null || !dep.current().hasBeen(PreCommitted))
return;
updateWaitingOn(store, id, exec, upd, dep);
});
return update;
}
/**
* @param dependencySafeCommand is either committed truncated, or invalidated
* @return true iff {@code maybeExecute} might now have a different outcome
*/
private static boolean updateWaitingOn(SafeCommandStore safeStore, TxnId waitingId, Timestamp executeWaitingAt, WaitingOn.Update waitingOn, SafeCommand dependencySafeCommand)
{
Command dependency = dependencySafeCommand.current();
Invariants.checkState(dependency.hasBeen(PreCommitted));
TxnId dependencyId = dependency.txnId();
if (waitingId.kind().awaitsOnlyDeps() && dependency.known().executeAt == ExecuteAtKnown && dependency.executeAt().compareTo(waitingId) > 0)
waitingOn.updateExecuteAtLeast(dependency.executeAt());
if (dependency.hasBeen(Truncated))
{
switch (dependency.saveStatus())
{
default: throw new AssertionError("Unhandled saveStatus: " + dependency.saveStatus());
case TruncatedApply:
case TruncatedApplyWithOutcome:
case TruncatedApplyWithDeps:
Invariants.checkState(dependency.executeAt().compareTo(executeWaitingAt) < 0 || waitingId.kind().awaitsOnlyDeps());
case ErasedOrInvalidated:
case Erased:
logger.trace("{}: {} is truncated. Stop listening and removing from waiting on commit set.", waitingId, dependencyId);
break;
case Invalidated:
logger.trace("{}: {} is invalidated. Stop listening and removing from waiting on commit set.", waitingId, dependencyId);
}
dependencySafeCommand.removeListener(new ProxyListener(waitingId));
return waitingOn.setAppliedOrInvalidated(dependencyId);
}
else if (dependency.executeAt().compareTo(executeWaitingAt) > 0 && !waitingId.kind().awaitsOnlyDeps())
{
// dependency cannot be a predecessor if it executes later
logger.trace("{}: {} executes after us. Stop listening and removing from waiting on apply set.", waitingId, dependencyId);
dependencySafeCommand.removeListener(new ProxyListener(waitingId));
return waitingOn.removeWaitingOn(dependencyId);
}
else if (dependency.hasBeen(Applied))
{
logger.trace("{}: {} has been applied. Stop listening and removing from waiting on apply set.", waitingId, dependencyId);
dependencySafeCommand.removeListener(new ProxyListener(waitingId));
return waitingOn.setAppliedAndPropagate(dependencyId, dependency.asCommitted().waitingOn());
}
else if (waitingOn.removeWaitingOnCommit(dependencyId))
{
logger.trace("{}: adding {} to waiting on apply set.", waitingId, dependencyId);
boolean addedWaitingOnApply = waitingOn.addWaitingOnApply(dependencyId);
Invariants.checkState(addedWaitingOnApply);
return true;
}
else if (waitingOn.isWaitingOnApply(dependencyId))
{
return false;
}
else if (safeStore.isFullyPreBootstrapOrStale(dependency, waitingOn.deps.participants(dependency.txnId())))
{
// TODO (expected): erase the dependency or otherwise prevent from executing
return false;
}
else
{
throw illegalState("We have a dependency to wait on, but have already finished waiting");
}
}
static void updateDependencyAndMaybeExecute(SafeCommandStore safeStore, SafeCommand safeCommand, SafeCommand predecessor, boolean notifyWaitingOn)
{
Command.Committed command = safeCommand.current().asCommitted();
if (command.hasBeen(Applied))
return;
WaitingOn.Update waitingOn = new WaitingOn.Update(command);
if (updateWaitingOn(safeStore, command.txnId(), command.executeAt(), waitingOn, predecessor))
{
safeCommand.updateWaitingOn(waitingOn);
maybeExecute(safeStore, safeCommand, false, notifyWaitingOn);
}
else
{
Command pred = predecessor.current();
if (pred.hasBeen(PreCommitted))
{
TxnId nextWaitingOn = command.waitingOn().nextWaitingOn();
if (nextWaitingOn != null && nextWaitingOn.equals(pred.txnId()))
safeStore.progressLog().waiting(predecessor, LocalExecution.Applied, pred.route(), null);
}
}
}
// TODO (now): document and justify all calls
public static void setTruncatedApply(SafeCommandStore safeStore, SafeCommand safeCommand)
{
setTruncatedApply(safeStore, safeCommand, null, null);
}
public static void setTruncatedApply(SafeCommandStore safeStore, SafeCommand safeCommand, @Nullable Timestamp executeAt, Route<?> maybeFullRoute)
{
Command command = safeCommand.current();
if (command.saveStatus().compareTo(TruncatedApply) >= 0) return;
FullRoute<?> route = Route.tryCastToFullRoute(maybeFullRoute);
if (route == null) route = Route.tryCastToFullRoute(command.route());
if (executeAt == null) executeAt = command.executeAtIfKnown();
if (route == null || executeAt == null)
{
safeCommand.update(safeStore, null, erased(command));
}
else
{
CommonAttributes attributes = command.mutable().route(route);
if (!safeCommand.txnId().kind().awaitsOnlyDeps())
{
safeCommand.update(safeStore, null, truncatedApply(attributes, TruncatedApply, executeAt, null, null));
}
else if (safeCommand.current().saveStatus().hasBeen(Applied))
{
Timestamp executesAtLeast = safeCommand.current().executesAtLeast();
if (executesAtLeast == null) safeCommand.update(safeStore, null, erased(command));
else safeCommand.update(safeStore, null, truncatedApply(attributes, TruncatedApply, executeAt, null, null, executesAtLeast));
}
}
}
public static void setErased(SafeCommandStore safeStore, SafeCommand safeCommand)
{
Listeners.Immutable durableListeners = safeCommand.current().durableListeners();
Command command = purge(safeStore, safeCommand, null, null, ERASE, true);
safeStore.notifyListeners(safeCommand, command, durableListeners, safeCommand.transientListeners());
}
/**
* Purge all or part of the metadata for a Commmand
*/
public static Command purge(SafeCommandStore safeStore, SafeCommand safeCommand, @Nullable Seekables<?, ?> keysOrRanges, @Nullable Unseekables<?> maybeFullRoute, Cleanup cleanup, boolean notifyListeners)
{
Command command = safeCommand.current();
// 1) a command has been applied; or
// 2) has been coordinated but *will not* be applied (we just haven't witnessed the invalidation yet); or
// 3) a command is durably decided and this shard only hosts its home data, so no explicit truncation is necessary to remove it
// TODO (desired): consider if there are better invariants we can impose for undecided transactions, to verify they aren't later committed (should be detected already, but more is better)
// note that our invariant here is imperfectly applied to keep the code cleaner: we don't verify that the caller was safe to invoke if we don't already have a route in the command and we're only PreCommitted
Invariants.checkState(command.hasBeen(Applied) || !command.hasBeen(PreCommitted)
|| (command.route() == null || Infer.safeToCleanup(safeStore, command, command.route(), command.executeAt()) || safeStore.isFullyPreBootstrapOrStale(command, command.route().participants()))
, "Command %s could not be truncated", command);
Command result;
switch (cleanup)
{
default: throw new AssertionError("Unexpected cleanup: " + cleanup);
case TRUNCATE_WITH_OUTCOME:
Invariants.checkArgument(!command.hasBeen(Truncated));
if (command.hasBeen(PreApplied))
{
result = truncatedApplyWithOutcome(command.asExecuted());
break;
}
// TODO (expected, consider): should we downgrade to no truncation in this case? Or are we stale?
case TRUNCATE:
// TODO (expected): consider passing through any information we have about the reason for loading, so we can infer APPLIED if !PreCommitted
Invariants.checkState(command.saveStatus().compareTo(TruncatedApply) < 0);
if (!command.hasBeen(PreCommitted)) result = erased(command);
else result = truncatedApply(command, Route.tryCastToFullRoute(maybeFullRoute));
break;
case ERASE:
Invariants.checkState(command.saveStatus().compareTo(Erased) < 0);
result = erased(command);
break;
}
safeCommand.update(safeStore, keysOrRanges, result);
safeStore.progressLog().clear(safeCommand.txnId());
if (notifyListeners)
safeStore.notifyListeners(safeCommand, result, command.durableListeners(), safeCommand.transientListeners());
return result;
}
public static boolean maybeCleanup(SafeCommandStore safeStore, SafeCommand safeCommand, Command command, EpochSupplier toEpoch, Unseekables<?> maybeFullRoute)
{
Cleanup cleanup = shouldCleanup(safeStore, command, toEpoch, maybeFullRoute);
if (command.saveStatus().compareTo(cleanup.appliesIfNot) >= 0)
return false;
purge(safeStore, safeCommand, null, maybeFullRoute, cleanup, true);
return true;
}
// TODO (now): either ignore this message if we don't have a route, or else require FullRoute requiring route, or else require FullRoute
public static Command setDurability(SafeCommandStore safeStore, SafeCommand safeCommand, Durability durability, @Nullable Route<?> route, @Nullable Timestamp executeAt)
{
Command command = safeCommand.current();
if (command.is(Truncated))
return command;
if (command.durability().compareTo(durability) >= 0)
return command;
CommonAttributes attrs = route == null ? command : updateRoute(command, route);
if (executeAt != null && command.status().hasBeen(Committed) && !command.executeAt().equals(executeAt))
safeStore.agent().onInconsistentTimestamp(command, command.asCommitted().executeAt(), executeAt);
attrs = attrs.mutable().durability(durability);
command = safeCommand.updateAttributes(attrs);
if (maybeCleanup(safeStore, safeCommand, command, null, route))
return safeCommand.current();
safeStore.progressLog().durable(command);
safeStore.notifyListeners(safeCommand);
return command;
}
public static Command setDurability(SafeCommandStore safeStore, SafeCommand safeCommand, Durability durability)
{
return setDurability(safeStore, safeCommand, durability, null, null);
}
// TODO (expected): we should have a new SaveState that avoids us duplicating work that indicates we're WaitingToExecute;
// this means that once we have begun visiting a transaction on behalf of some later transaction, we don't need to do so again.
static class NotifyWaitingOn implements PreLoadContext, Consumer<SafeCommandStore>
{
LocalExecution[] blockedUntil = new LocalExecution[4];
TxnId[] txnIds = new TxnId[4];
int depth;
public NotifyWaitingOn(SafeCommand root)
{
txnIds[0] = root.txnId();
blockedUntil[0] = LocalExecution.Applied;
}
@Override
public void accept(SafeCommandStore safeStore)
{
// first do a simple loop to skip over txns that have been truncated
SafeCommand prevSafe = ifInitialised(safeStore, depth - 1);
if (depth > 0 && (prevSafe == null || prevSafe.current().hasBeen(Truncated)))
{
while (depth > 0 && (prevSafe == null || prevSafe.current().hasBeen(Truncated)))
prevSafe = ifInitialised(safeStore, --depth - 1);
}
else
{
// we know we loaded cur, so if it's null it's either truncated or we haven't witnessed it and need to initialise;
// in this case use our predecessor's intersecting keys to decide which
SafeCommand curSafe = ifInitialised(safeStore, depth);
if (curSafe == null)
{
if (prevSafe != null)
{
RedundantStatus redundantStatus = safeStore.commandStore().redundantBefore().status(txnIds[depth], prevSafe.current().executeAt(), prevSafe.current().partialDeps().participants(txnIds[depth]));
switch (redundantStatus)
{
default: throw new AssertionError("Unexpected redundant status: " + redundantStatus);
case NOT_OWNED: throw new AssertionError("Invalid state: waiting for execution of command that is not owned at the execution time");
case SHARD_REDUNDANT:
case LOCALLY_REDUNDANT:
case REDUNDANT_PRE_BOOTSTRAP_OR_STALE:
case PRE_BOOTSTRAP_OR_STALE:
removeRedundantDependencies(safeStore, prevSafe, txnIds[depth]);
prevSafe = get(safeStore, --depth - 1);
break;
case LIVE:
case PARTIALLY_PRE_BOOTSTRAP_OR_STALE:
initialise(safeStore, depth);
}
}
else
{
do
{
if (--depth == -1)
return; // the command must have been truncated
curSafe = prevSafe;
prevSafe = ifInitialised(safeStore, depth - 1);
} while (curSafe == null);
}
}
}
Invariants.checkState(depth == 0 || prevSafe != null);
loop: while (depth >= 0)
{
if (depth > 100000) // TODO (expected): dump more debug info to aid investigation
throw new StackOverflowError("Exploring a probably-recursive or otherwise faulty dependency graph from " + txnIds[0]);
Command prev = prevSafe != null ? prevSafe.current() : null;
SafeCommand curSafe = ifLoadedAndInitialised(safeStore, depth);
Command cur = curSafe != null ? curSafe.current() : null;
LocalExecution until = blockedUntil[depth];
if (cur == null)
{
// need to load; schedule execution for later
safeStore.commandStore().execute(this, this).begin(safeStore.agent());
return;
}
if (prev != null)
{
if (cur.isAtLeast(until) || (cur.hasBeen(PreCommitted) && cur.executeAt().compareTo(prev.executeAt()) > 0 && !prev.txnId().kind().awaitsOnlyDeps()))
{
updateDependencyAndMaybeExecute(safeStore, prevSafe, curSafe, false);
--depth;
prevSafe = get(safeStore, depth - 1);
continue;
}
}
else if (cur.isAtLeast(until))
{
// we're done; already applying
Invariants.checkState(depth == 0);
break;
}
WaitingOn waitingOn = cur.hasBeen(Stable) ? cur.asCommitted().waitingOn() : WaitingOn.EMPTY;
TxnId directlyBlockedOnCommit;
TxnId directlyBlockedOnApply = waitingOn.nextWaitingOnApply();
if (directlyBlockedOnApply != null)
{
// preferentially block on apply, as this probably saves us additional bookkeeping
// by giving other dependencies time to complete
push(directlyBlockedOnApply, LocalExecution.Applied);
prevSafe = curSafe;
}
else if ((directlyBlockedOnCommit = waitingOn.nextWaitingOnCommit()) != null)
{
push(directlyBlockedOnCommit, WaitingToExecute);
prevSafe = curSafe;
}
else
{
if (cur.hasBeen(Stable))
{
if (!cur.is(ReadyToExecute) && cur.saveStatus() != Applying && !cur.asCommitted().isWaitingOnDependency())
{
if (!maybeExecute(safeStore, curSafe, false, false))
throw new AssertionError("Is able to Apply, but has not done so");
// loop and re-test the command's status; we may still want to notify blocking, esp. if not homeShard
continue;
}
}
else if (!cur.hasBeen(PreCommitted))
until = ReadyToExclude;
Timestamp executeAt = prev == null ? cur.executeAt() : prev.executeAt();
Participants<?> participants = prev != null // TODO (desired): slightly costly to invert a large partialDeps collection
? prev.partialDeps().participants(cur.txnId()) // we do want to limit to the intersection of keys with the waiting transaction
: cur.route().participants(); // no need to slice to execution ranges, as implicitly done for us by RedundantBefore
RedundantStatus redundantStatus = safeStore.commandStore().redundantBefore().status(cur.txnId(), executeAt, participants);
switch (redundantStatus)
{
default: throw new AssertionError("Unknown redundant status: " + redundantStatus);
case NOT_OWNED: throw new AssertionError("Invalid state: waiting for execution of command that is not owned at the execution time");
case LIVE:
case PARTIALLY_PRE_BOOTSTRAP_OR_STALE:
logger.trace("{} blocked on {} until {}", txnIds[0], cur.txnId(), until);
safeStore.progressLog().waiting(curSafe, until, null, participants);
break loop;
case LOCALLY_REDUNDANT:
case SHARD_REDUNDANT:
if (cur.txnId().kind() == EphemeralRead)
{
removeRedundantDependencies(safeStore, curSafe, null);
maybeExecute(safeStore, curSafe, false, false);
--depth;
prevSafe = get(safeStore, depth - 1);
break;
}
case PRE_BOOTSTRAP_OR_STALE:
case REDUNDANT_PRE_BOOTSTRAP_OR_STALE:
Invariants.checkState(cur.txnId().kind() == EphemeralRead || cur.hasBeen(Applied) || !cur.hasBeen(PreCommitted) || redundantStatus == PRE_BOOTSTRAP_OR_STALE);
if (prev == null)
return;
curSafe.removeListener(prev.asListener());
// we've been applied, invalidated, or are no longer relevant
removeRedundantDependencies(safeStore, prevSafe, cur.txnId());
--depth;
prevSafe = get(safeStore, depth - 1);
}
}
}
for (int i = 1 ; i <= depth ; ++i)
initialise(safeStore, i).addListener(get(safeStore, i - 1).current().asListener());
}
private SafeCommand ifInitialised(SafeCommandStore safeStore, int i)
{
if (i < 0) return null;
return safeStore.ifInitialised(txnIds[i]);
}
private SafeCommand ifLoadedAndInitialised(SafeCommandStore safeStore, int i)
{
if (i < 0) return null;
return safeStore.ifLoadedAndInitialised(txnIds[i]);
}
private SafeCommand get(SafeCommandStore safeStore, int i)
{
if (i < 0) return null;
SafeCommand result = safeStore.ifInitialised(txnIds[i]);
Invariants.checkState(result != null);
return result;
}
private SafeCommand initialise(SafeCommandStore safeStore, int i)
{
return safeStore.get(txnIds[i]);
}
void push(TxnId by, LocalExecution until)
{
if (++depth == txnIds.length)
{
txnIds = Arrays.copyOf(txnIds, txnIds.length * 2);
blockedUntil = Arrays.copyOf(blockedUntil, txnIds.length);
}
txnIds[depth] = by;
blockedUntil[depth] = until;
}
@Override
public TxnId primaryTxnId()
{
return txnIds[0];
}
@Override
public Collection<TxnId> additionalTxnIds()
{
return Arrays.asList(txnIds).subList(1, depth + 1);
}
}
static Command removeRedundantDependencies(SafeCommandStore safeStore, SafeCommand safeCommand, @Nullable TxnId redundant)
{
CommandStore commandStore = safeStore.commandStore();
Command.Committed current = safeCommand.current().asCommitted();
WaitingOn.Update update = new WaitingOn.Update(current.waitingOn);
TxnId minWaitingOnTxnId = update.minWaitingOnTxnId();
if (minWaitingOnTxnId != null && commandStore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(), current.executeAt(), current.route().participants()))
safeStore.commandStore().removeRedundantDependencies(current.route().participants(), update);
// if we are a range transaction, being redundant for this transaction does not imply we are redundant for all transactions
if (redundant != null)
update.removeWaitingOn(redundant);
return safeCommand.updateWaitingOn(update);
}
@SuppressWarnings({"unchecked", "rawtypes"})
public static Command informHome(SafeCommandStore safeStore, SafeCommand safeCommand, Route<?> someRoute)
{
Command command = safeCommand.current();
Invariants.checkState(owns(safeStore, command.txnId().epoch(), someRoute.homeKey()));
// TODO (expected): tighten up definition of what we know about a Route (pack it into Known) and whether we've been witnessed to decide our action here
if (command.hasBeen(PreAccepted))
return command;
Command result = safeCommand.updateAttributes(command.mutable().route(Route.merge((Route)someRoute, command.route())));
safeStore.progressLog().unwitnessed(safeCommand.txnId(), Home);
return result;
}
/**
* A key nominated to represent the "home" shard - only members of the home shard may be nominated to recover
* a transaction, to reduce the cluster-wide overhead of ensuring progress. A transaction that has only been
* witnessed at PreAccept may however trigger a process of ensuring the home shard is durably informed of
* the transaction.
*
* Note that for ProgressLog purposes the "home shard" is the shard as of txnId.epoch.
* For recovery purposes the "home shard" is as of txnId.epoch until Committed, and executeAt.epoch once Executed
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static CommonAttributes updateRoute(Command command, Route<?> route)
{
if (command.route() == null || !command.route().containsAll(route))
return command.mutable().route(Route.merge((Route)route, command.route()));
return command;
}
private static ProgressShard progressShard(Route<?> route, @Nullable RoutingKey progressKey, Ranges coordinates)
{
if (progressKey == null)
return No;
if (!coordinates.contains(progressKey))
return No;
return progressKey.equals(route.homeKey()) ? Home : Local;
}
enum EnsureAction
{
/** Don't check */
Ignore,
/** Add, but return false if insufficient for any reason */
TryAdd,
/** Supplement existing information, asserting that the existing and additional information are independently sufficient,
* returning false only if the existing information is absent AND the new information is insufficient. */
Add,
/** Set, but only return false if insufficient */
TrySet,
/** Overwrite existing information if sufficient; fail otherwise */
Set
}
@SuppressWarnings({"unchecked", "rawtypes"})
private static CommonAttributes set(CommonAttributes attrs,
Ranges existingRanges, Ranges additionalRanges,
ProgressShard shard, Route<?> route,
@Nullable PartialTxn partialTxn, EnsureAction ensurePartialTxn,
@Nullable PartialDeps partialDeps, EnsureAction ensurePartialDeps)
{
Invariants.checkState(shard != Unsure);
Ranges allRanges = existingRanges.with(additionalRanges);
attrs = attrs.mutable().route(Route.merge(attrs.route(), (Route)route));
// TODO (soon): stop round-robin hashing; partition only on ranges
switch (ensurePartialTxn)
{
case Add:
if (partialTxn == null)
break;
if (attrs.partialTxn() != null)
{
partialTxn = partialTxn.slice(allRanges, shard.isHome());
attrs = attrs.mutable().partialTxn(attrs.partialTxn().with(partialTxn));
break;
}
case Set:
case TrySet:
// TODO (desired): only includeQuery if shard.isHome(); this affects state eviction and is low priority given size in C*
attrs = attrs.mutable().partialTxn(partialTxn.slice(allRanges, true));
break;
}
switch (ensurePartialDeps)
{
case Add:
if (partialDeps == null)
break;
if (attrs.partialDeps() != null)
{
attrs = attrs.mutable().partialDeps(attrs.partialDeps().with(partialDeps.slice(allRanges)));
break;
}
case Set:
case TrySet:
attrs = attrs.mutable().partialDeps(partialDeps.slice(allRanges));
break;
}
return attrs;
}
/**
* Validate we have sufficient information for the route, partialTxn and partialDeps fields, and if so update them;
* otherwise return false (or throw an exception if an illegal state is encountered)
*/
private static boolean validate(Status status, CommonAttributes attrs,
Ranges existingRanges, Ranges additionalRanges, ProgressShard shard,
Route<?> route, EnsureAction ensureRoute,
@Nullable PartialTxn partialTxn, EnsureAction ensurePartialTxn,
@Nullable PartialDeps partialDeps, EnsureAction ensurePartialDeps)
{
return validate(status, attrs, existingRanges, additionalRanges, shard, route, ensureRoute, partialTxn, ensurePartialTxn, partialDeps, ensurePartialDeps, null);
}
private static boolean validate(Status status, CommonAttributes attrs,
Ranges existingRanges, Ranges additionalRanges, ProgressShard shard,
Route<?> route, EnsureAction ensureRoute,
@Nullable PartialTxn partialTxn, EnsureAction ensurePartialTxn,
@Nullable PartialDeps partialDeps, EnsureAction ensurePartialDeps,
@Nullable SafeCommandStore permitStaleMissing)
{
if (shard == Unsure)
return false;
// first validate route
switch (ensureRoute)
{
default: throw new AssertionError("Unexpected action: " + ensureRoute);
case TryAdd:
case Add:
if (!isFullRoute(attrs.route()) && !isFullRoute(route))
return false;
case Ignore:
break;
case Set:
if (!isFullRoute(route))
throw new IllegalArgumentException("Incomplete route (" + route + ") sent to home shard");
break;
case TrySet:
if (!isFullRoute(route))
return false;
}
// invalid to Add deps to Accepted or AcceptedInvalidate statuses, as Committed deps are not equivalent
// and we may erroneously believe we have covered a wider range than we have infact covered
if (ensurePartialDeps == Add)
Invariants.checkState(status != Accepted && status != AcceptedInvalidate);
// validate new partial txn
if (!validate(ensurePartialTxn, existingRanges, additionalRanges, covers(attrs.partialTxn()), covers(partialTxn), permitStaleMissing, "txn", partialTxn))
return false;
Invariants.checkState(partialTxn == null || attrs.txnId().kind().equals(partialTxn.kind()), "Transaction has different kind to its TxnId");
Invariants.checkState(partialTxn == null || !shard.isHome() || ensurePartialTxn == Ignore || hasQuery(attrs.partialTxn()) || hasQuery(partialTxn), "Home transaction should include query");
return validate(ensurePartialDeps, existingRanges, additionalRanges, covers(attrs.partialDeps()), covers(partialDeps), permitStaleMissing, "deps", partialDeps);
}
// FIXME (immutable-state): has this been removed?
private static boolean validate(EnsureAction action, Ranges existingRanges, Ranges requiredRanges,
Ranges existing, Ranges adding, @Nullable SafeCommandStore permitStaleMissing,
String kind, Object obj)
{
switch (action)
{
default: throw illegalState("Unexpected action: " + action);
case Ignore:
return true;
case TrySet:
case Set:
if (containsAll(adding, requiredRanges, permitStaleMissing))
return true;
if (action == Set)
illegalState("Incomplete " + kind + " (" + obj + ") provided; does not cover " + requiredRanges.subtract(adding));
return false;
case TryAdd:
case Add:
if (existing == null)
{
if (adding == null)
return false; // we don't want to permit a null value for txn/deps, even if we are stale for all participating ranges, as it breaks assumptions elsewhere
if (!adding.containsAll(existingRanges))
return false;
return validate(action == TryAdd ? TrySet : Set, existingRanges, requiredRanges, existing, adding, permitStaleMissing, kind, obj);
}
Invariants.checkState(existing.containsAll(existingRanges), "Existing ranges insufficient");
if (requiredRanges == existingRanges)
return true;
if (adding == null)
return permitStaleMissing != null && containsAll(Ranges.EMPTY, requiredRanges.subtract(existing), permitStaleMissing);
requiredRanges = requiredRanges.subtract(existing);
if (containsAll(adding, requiredRanges, permitStaleMissing))
return true;
if (action == Add)
illegalState("Incomplete " + kind + " (" + obj + ") provided; does not cover " + requiredRanges.subtract(adding));
return false;
}
}
private static boolean containsAll(Ranges adding, Ranges requiredRanges, @Nullable SafeCommandStore permitStaleMissing)
{
if (adding.containsAll(requiredRanges))
return true;
if (permitStaleMissing != null)
{
Ranges staleRanges = permitStaleMissing.commandStore().redundantBefore().staleRanges();
requiredRanges = requiredRanges.subtract(staleRanges);
if (adding.containsAll(requiredRanges))
return true;
}
return false;
}
}