blob: 99d4333e0b106bf144891ba188818dbb8c47b54f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package accord.messages;
import accord.api.Result;
import accord.api.RoutingKey;
import accord.coordinate.Infer;
import accord.local.Command;
import accord.local.Commands;
import accord.local.Node;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.local.SaveStatus;
import accord.local.Status;
import accord.messages.CheckStatus.FoundKnownMap;
import accord.messages.CheckStatus.WithQuorum;
import accord.primitives.Ballot;
import accord.primitives.EpochSupplier;
import accord.primitives.Keys;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Route;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.utils.Invariants;
import javax.annotation.Nullable;
import java.util.function.BiConsumer;
import static accord.coordinate.Infer.InvalidIfNot.NotKnownToBeInvalid;
import static accord.local.RedundantStatus.LOCALLY_REDUNDANT;
import static accord.local.SaveStatus.Stable;
import static accord.local.Status.NotDefined;
import static accord.local.Status.Phase.Cleanup;
import static accord.local.Status.PreApplied;
import static accord.messages.CheckStatus.WithQuorum.HasQuorum;
import static accord.primitives.Routables.Slice.Minimal;
import static accord.utils.Invariants.illegalState;
public class Propagate implements EpochSupplier, LocalRequest<Status.Known>
public static class SerializerSupport
public static Propagate create(TxnId txnId, Route<?> route, SaveStatus maxKnowledgeSaveStatus, SaveStatus maxSaveStatus, Ballot ballot, Status.Durability durability, RoutingKey homeKey, RoutingKey progressKey, Status.Known achieved, FoundKnownMap known, boolean isTruncated, PartialTxn partialTxn, PartialDeps committedDeps, long toEpoch, Timestamp executeAt, Writes writes, Result result)
return new Propagate(txnId, route, maxKnowledgeSaveStatus, maxSaveStatus, ballot, durability, homeKey, progressKey, achieved, known, isTruncated, partialTxn, committedDeps, toEpoch, executeAt, writes, result, null);
public final TxnId txnId;
public final Route<?> route;
// TODO (expected): remove dependency on these two SaveStatus
public final SaveStatus maxKnowledgeSaveStatus;
public final SaveStatus maxSaveStatus;
public final Ballot ballot;
public final Status.Durability durability;
@Nullable public final RoutingKey homeKey;
@Nullable public final RoutingKey progressKey;
// this is a WHOLE NODE measure, so if commit epoch has more ranges we do not count as committed if we can only commit in coordination epoch
public final Status.Known achieved;
public final FoundKnownMap known;
public final boolean isShardTruncated;
@Nullable public final PartialTxn partialTxn;
@Nullable public final PartialDeps stableDeps;
public final long toEpoch;
@Nullable public final Timestamp committedExecuteAt;
@Nullable public final Writes writes;
@Nullable public final Result result;
protected transient BiConsumer<Status.Known, Throwable> callback;
public BiConsumer<Status.Known, Throwable> callback()
return callback;
TxnId txnId,
Route<?> route,
SaveStatus maxKnowledgeSaveStatus,
SaveStatus maxSaveStatus,
Ballot ballot,
Status.Durability durability,
@Nullable RoutingKey homeKey,
@Nullable RoutingKey progressKey,
Status.Known achieved,
FoundKnownMap known,
boolean isShardTruncated,
@Nullable PartialTxn partialTxn,
@Nullable PartialDeps stableDeps,
long toEpoch,
@Nullable Timestamp committedExecuteAt,
@Nullable Writes writes,
@Nullable Result result,
BiConsumer<Status.Known, Throwable> callback)
this.txnId = txnId;
this.route = route;
this.maxKnowledgeSaveStatus = maxKnowledgeSaveStatus;
this.maxSaveStatus = maxSaveStatus;
this.ballot = ballot;
this.durability = durability;
this.homeKey = homeKey;
this.progressKey = progressKey;
this.achieved = achieved;
this.known = known;
this.isShardTruncated = isShardTruncated;
this.partialTxn = partialTxn;
this.stableDeps = stableDeps;
this.toEpoch = toEpoch;
this.committedExecuteAt = committedExecuteAt;
this.writes = writes;
this.result = result;
this.callback = callback;
public void process(Node on, BiConsumer<Status.Known, Throwable> callback)
this.callback = callback;
@SuppressWarnings({"rawtypes", "unchecked"})
public static void propagate(Node node, TxnId txnId, long sourceEpoch, WithQuorum withQuorum, Route route, @Nullable Status.Known target, CheckStatus.CheckStatusOkFull full, BiConsumer<Status.Known, Throwable> callback)
if (full.maxKnowledgeSaveStatus.status == NotDefined && full.maxInvalidIfNot() == NotKnownToBeInvalid)
callback.accept(Status.Known.Nothing, null);
Invariants.checkState(sourceEpoch == txnId.epoch() || (full.executeAt != null && sourceEpoch == full.executeAt.epoch()) || full.maxKnowledgeSaveStatus == SaveStatus.Erased || full.maxKnowledgeSaveStatus == SaveStatus.ErasedOrInvalidated);
full = full.finish(route, withQuorum);
route = Invariants.nonNull(full.route);
// TODO (required): permit individual shards that are behind to catch up by themselves
long toEpoch = sourceEpoch;
Ranges sliceRanges = node.topology().localRangesForEpochs(txnId.epoch(), toEpoch);
RoutingKey progressKey = node.trySelectProgressKey(txnId, route);
Ranges covering = route.sliceCovering(sliceRanges, Minimal);
Participants<?> participatingKeys = route.participants().slice(covering, Minimal);
Status.Known achieved = full.knownFor(participatingKeys);
if (achieved.executeAt.isDecidedAndKnownToExecute() && full.executeAt.epoch() > toEpoch)
Ranges acceptRanges;
if (!node.topology().hasEpoch(full.executeAt.epoch()) ||
(!route.covers(acceptRanges = node.topology().localRangesForEpochs(txnId.epoch(), full.executeAt.epoch()))))
// we don't know what the execution epoch requires, so we cannot be sure we can replicate it locally
// we *could* wait until we have the local epoch before running this
Status.Outcome outcome = achieved.outcome.propagatesBetweenShards() ? achieved.outcome : Status.Outcome.Unknown;
achieved = new Status.Known(achieved.route, achieved.definition, achieved.executeAt, Status.KnownDeps.DepsUnknown, outcome);
// TODO (expected): this should only be the two precise epochs, not the full range of epochs
sliceRanges = acceptRanges;
covering = route.sliceCovering(sliceRanges, Minimal);
participatingKeys = route.participants().slice(covering, Minimal);
Status.Known knownForExecution = full.knownFor(participatingKeys);
if ((target != null && target.isSatisfiedBy(knownForExecution)) || achieved.isSatisfiedBy(knownForExecution))
achieved = knownForExecution;
toEpoch = full.executeAt.epoch();
{ // TODO (expected): does downgrading this ever block progress?
Invariants.checkState(sourceEpoch == txnId.epoch(), "%d != %d", sourceEpoch, txnId.epoch());
achieved = new Status.Known(achieved.route, achieved.definition, achieved.executeAt, knownForExecution.deps, knownForExecution.outcome);
boolean isShardTruncated = withQuorum == HasQuorum && full.isTruncatedResponse(covering);
PartialTxn partialTxn = full.partialTxn;
if (achieved.definition.isKnown())
partialTxn = full.partialTxn.slice(sliceRanges, true).reconstitutePartial(covering);
PartialDeps stableDeps = full.stableDeps;
if (achieved.deps.hasDecidedDeps())
stableDeps = full.stableDeps.slice(sliceRanges).reconstitutePartial(covering);
Propagate propagate =
new Propagate(txnId, route, full.maxKnowledgeSaveStatus, full.maxSaveStatus, full.acceptedOrCommitted, full.durability, full.homeKey, progressKey, achieved,, isShardTruncated, partialTxn, stableDeps, toEpoch, full.executeAtIfKnown(), full.writes, full.result, callback);
public TxnId primaryTxnId()
return txnId;
public Seekables<?, ?> keys()
if (partialTxn != null)
return partialTxn.keys();
if (stableDeps != null)
return stableDeps.keyDeps.keys();
return Keys.EMPTY;
public void preProcess(Node on, Node.Id from, ReplyContext replyContext)
throw new UnsupportedOperationException();
public void process(Node on, Node.Id from, ReplyContext replyContext)
throw new UnsupportedOperationException();
public void process(Node node)
node.mapReduceConsumeLocal(this, route, txnId.epoch(), toEpoch, this);
public Void apply(SafeCommandStore safeStore)
SafeCommand safeCommand = safeStore.get(txnId, this, route);
Command command = safeCommand.current();
PartialTxn partialTxn = achieved.hasDefinition() ? this.partialTxn : null;
PartialDeps stableDeps = achieved.hasDecidedDeps() ? this.stableDeps : null;
switch (command.saveStatus().phase)
// Already know the outcome, waiting on durability so maybe update with new durability information which can also trigger cleanup
case Persist: return updateDurability(safeStore, safeCommand);
case Cleanup: return null;
Timestamp executeAtIfKnown = command.executeAtIfKnown(committedExecuteAt);
Status.Known achieved = this.achieved;
if (isShardTruncated)
// TODO (required): do not markShardStale for reads; in general optimise handling of case where we cannot recover a known no-op transaction
achieved = applyOrUpgradeTruncated(safeStore, safeCommand, command, executeAtIfKnown);
if (achieved == null)
return null;
Ranges needed = safeStore.ranges().allBetween(txnId.epoch(), (executeAtIfKnown == null ? txnId : executeAtIfKnown).epoch());
if (achieved.isDefinitionKnown() && partialTxn == null)
PartialTxn existing = command.partialTxn();
Ranges neededForDefinition = existing == null ? needed : needed.subtract(existing.covering());
partialTxn = this.partialTxn.slice(needed, true).reconstitutePartial(neededForDefinition);
if (achieved.hasDecidedDeps() && stableDeps == null)
Invariants.checkState(executeAtIfKnown != null);
// we don't subtract existing partialDeps, as they cannot be committed deps; we only permit committing deps covering all participating ranges
stableDeps = this.stableDeps.slice(needed).reconstitutePartial(needed);
Status propagate = achieved.atLeast(command.known()).propagatesStatus();
if (command.hasBeen(propagate))
if (maxSaveStatus.phase == Cleanup && durability.isDurableOrInvalidated() && Infer.safeToCleanup(safeStore, command, route, executeAtIfKnown))
Commands.setTruncatedApply(safeStore, safeCommand);
// TODO (expected): maybe stale?
return updateDurability(safeStore, safeCommand);
switch (propagate)
default: throw illegalState("Unexpected status: " + propagate);
case Truncated: throw illegalState("Status expected to be handled elsewhere: " + propagate);
case Accepted:
case AcceptedInvalidate:
// we never "propagate" accepted statuses as these are essentially votes,
// and contribute nothing to our local state machine
throw illegalState("Invalid states to propagate: " + propagate);
case Invalidated:
Commands.commitInvalidate(safeStore, safeCommand, route);
case Applied:
case PreApplied:
Invariants.checkState(executeAtIfKnown != null);
if (toEpoch >= executeAtIfKnown.epoch())
confirm(Commands.apply(safeStore, safeCommand, txnId, route, progressKey, executeAtIfKnown, stableDeps, partialTxn, writes, result));
case Stable:
case ReadyToExecute:
confirm(Commands.commit(safeStore, safeCommand, Stable, ballot, txnId, route, progressKey, partialTxn, executeAtIfKnown, stableDeps));
case Committed:
// TODO (expected): we can propagate Committed as Stable if we have any other Stable result AND a quorum of committedDeps
case PreCommitted:
confirm(Commands.precommit(safeStore, safeCommand, txnId, executeAtIfKnown, route));
// TODO (desired): would it be clearer to yield a SaveStatus so we can have PreCommittedWithDefinition
if (!achieved.definition.isKnown())
case PreAccepted:
// only preaccept if we coordinate the transaction
if (safeStore.ranges().coordinates(txnId).intersects(route) && Route.isFullRoute(route))
Commands.preaccept(safeStore, safeCommand, txnId, txnId.epoch(), partialTxn, Route.castToFullRoute(route), progressKey);
case NotDefined:
return updateDurability(safeStore, safeCommand);
// if can only propagate Truncated, we might be stale; try to upgrade for this command store only, even partially if necessary
// note: this is invoked if the command is truncated for ANY local command store - we might
private Status.Known applyOrUpgradeTruncated(SafeCommandStore safeStore, SafeCommand safeCommand, Command command, Timestamp executeAtIfKnown)
if (Infer.safeToCleanup(safeStore, command, route, executeAtIfKnown))
// don't create a new Erased record if we're already cleaned up
Commands.setErased(safeStore, safeCommand);
return null;
Ranges ranges = safeStore.ranges().allBetween(txnId.epoch(), (executeAtIfKnown == null ? txnId : executeAtIfKnown).epoch());
Participants<?> participants = route.participants(ranges, Minimal);
Invariants.checkState(!participants.isEmpty()); // we shouldn't be fetching data for transactions we only coordinate
boolean isTruncatedForLocalRanges = known.hasTruncated(participants);
if (!isTruncatedForLocalRanges)
// we're truncated *somewhere* but not locally; whether we have the executeAt is immaterial to this calculus,
// as we're either ready to go or we're waiting on the coordinating shard to complete this transaction, so pick
// the maximum we can achieve and return that
return known.knownFor(participants);
// if our peers have truncated this command, then either:
// 1) we have already applied it locally; 2) the command doesn't apply locally; 3) we are stale; or 4) the command is invalidated
// we're now at least partially stale, but let's first see if we can progress this shard, or we can do so in part
if (executeAtIfKnown == null)
// TODO (desired): merge this with the executeAt known path where possible, as a lot of duplication
if (safeStore.commandStore().redundantBefore().status(txnId, txnId, participants).compareTo(LOCALLY_REDUNDANT) >= 0)
// invalidated
Commands.setErased(safeStore, safeCommand);
return null;
ranges = safeStore.commandStore().redundantBefore().everExpectToExecute(txnId, ranges);
if (!ranges.isEmpty())
// we don't even know the execution time, so we cannot possibly proceed besides erasing the command state and marking ourselves stale
// TODO (required): we could in principle be stale for future epochs we haven't witnessed yet. Ensure up to date epochs before finalising this application, or else fetch a maximum possible epoch
safeStore.commandStore().markShardStale(safeStore, txnId, participants.toRanges().slice(ranges, Minimal), false);
Commands.setErased(safeStore, safeCommand);
return null;
if (safeStore.commandStore().redundantBefore().status(txnId, executeAtIfKnown, participants).compareTo(LOCALLY_REDUNDANT) >= 0)
// invalidated
Commands.setErased(safeStore, safeCommand);
return null;
// compute the ranges we expect to execute - i.e. those we own, and are not stale or pre-bootstrap
ranges = safeStore.commandStore().redundantBefore().expectToExecute(txnId, executeAtIfKnown, ranges);
if (ranges.isEmpty())
// TODO (expected): we might prefer to adopt Redundant status, and permit ourselves to later accept the result of the execution and/or definition
Commands.setTruncatedApply(safeStore, safeCommand, route);
return null;
participants = participants.slice(ranges, Minimal);
if (participants.isEmpty())
// we only coordinate this transaction, so being unable to retrieve its state does not imply any staleness
// TODO (now): double check this doesn't stop us coordinating the transaction (it shouldn't, as doesn't imply durability)
Commands.setTruncatedApply(safeStore, safeCommand, route);
return null;
// if the command has been truncated globally, then we should expect to apply it
// if we cannot obtain enough information from a majority to do so then we have been left behind
Status.Known required = PreApplied.minKnown;
Status.Known requireExtra = required.subtract(command.known()); // the extra information we need to reach pre-applied
Ranges achieveRanges = known.knownFor(requireExtra, ranges); // the ranges for which we can already successfully achieve this
// any ranges we execute but cannot achieve the pre-applied status for have been left behind and are stale
Ranges staleRanges = ranges.subtract(achieveRanges);
Participants<?> staleParticipants = participants.slice(staleRanges, Minimal);
staleRanges = staleParticipants.toRanges();
if (staleRanges.isEmpty())
return required;
safeStore.commandStore().markShardStale(safeStore, executeAtIfKnown, staleRanges, true);
if (!staleRanges.containsAll(participants))
return required;
// TODO (expected): we might prefer to adopt Redundant status, and permit ourselves to later accept the result of the execution and/or definition
Commands.setTruncatedApply(safeStore, safeCommand, route);
return null;
* If there is new information about the command being durable and we are in the coordination shard in the coordination epoch then update the durability information and possibly cleanup
private Void updateDurability(SafeCommandStore safeStore, SafeCommand safeCommand)
// TODO (expected): Infer durability status from cleanup/truncation
if (!durability.isDurable() || homeKey == null)
return null;
Commands.setDurability(safeStore, safeCommand, durability, route, committedExecuteAt);
return null;
public Void reduce(Void o1, Void o2)
return null;
public void accept(Void result, Throwable failure)
if (null != callback)
callback.accept(failure == null ? achieved : null, failure);
public MessageType type()
// TODO (now): this logic doesn't work now we permit upgrading; need to pick the maximum *possible* status we might propagate
// might be better to
switch (achieved.propagatesStatus())
case Applied:
case PreApplied:
if (toEpoch >= committedExecuteAt.epoch())
return MessageType.PROPAGATE_APPLY_MSG;
case Committed:
case ReadyToExecute:
return MessageType.PROPAGATE_STABLE_MSG;
case PreCommitted:
if (!achieved.definition.isKnown())
return MessageType.PROPAGATE_OTHER_MSG;
case PreAccepted:
return MessageType.PROPAGATE_OTHER_MSG;
public long epoch()
return toEpoch;
private static void confirm(Commands.CommitOutcome outcome)
switch (outcome)
default: throw illegalState("Unknown outcome: " + outcome);
case Redundant:
case Success:
case Insufficient: throw illegalState("Should have enough information");
private static void confirm(Commands.ApplyOutcome outcome)
switch (outcome)
default: throw illegalState("Unknown outcome: " + outcome);
case Redundant:
case Success:
case Insufficient: throw illegalState("Should have enough information");
public String toString()
return "Propagate{type:" + type() +
", txnId: " + txnId +
", saveStatus: " + maxKnowledgeSaveStatus +
", deps: " + stableDeps +
", txn: " + partialTxn +
", executeAt: " + committedExecuteAt +
", writes:" + writes +
", result:" + result +