| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package accord.local; |
| |
| import accord.api.ProgressLog.ProgressShard; |
| import accord.api.Result; |
| import accord.api.RoutingKey; |
| import accord.local.Command.WaitingOn; |
| import accord.primitives.*; |
| import accord.utils.Invariants; |
| import accord.utils.async.AsyncChain; |
| import accord.utils.async.AsyncChains; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| |
| import java.util.Arrays; |
| import java.util.Map; |
| import java.util.function.Consumer; |
| |
| import static accord.api.ProgressLog.ProgressShard.*; |
| import static accord.api.ProgressLog.ProgressShard.Local; |
| import static accord.local.Commands.EnsureAction.*; |
| import static accord.local.Status.*; |
| import static accord.local.Status.Known.ExecuteAtOnly; |
| import static accord.primitives.Route.isFullRoute; |
| |
| public class Commands |
| { |
| private static final Logger logger = LoggerFactory.getLogger(Commands.class); |
| |
| private Commands() |
| { |
| } |
| |
| private static Ranges covers(@Nullable PartialTxn txn) |
| { |
| return txn == null ? null : txn.covering(); |
| } |
| |
| private static Ranges covers(@Nullable PartialDeps deps) |
| { |
| return deps == null ? null : deps.covering; |
| } |
| |
| private static boolean hasQuery(PartialTxn txn) |
| { |
| return txn != null && txn.query() != null; |
| } |
| |
| /** |
| * true iff this commandStore owns the given key on the given epoch |
| */ |
| public static boolean owns(SafeCommandStore safeStore, long epoch, RoutingKey someKey) |
| { |
| return safeStore.ranges().at(epoch).contains(someKey); |
| } |
| |
| public static RoutingKey noProgressKey() |
| { |
| return NO_PROGRESS_KEY; |
| } |
| |
| public enum AcceptOutcome {Success, Redundant, RejectedBallot} |
| |
| public static AcceptOutcome preaccept(SafeCommandStore safeStore, TxnId txnId, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey) |
| { |
| return preacceptOrRecover(safeStore, txnId, partialTxn, route, progressKey, Ballot.ZERO); |
| } |
| |
| public static AcceptOutcome recover(SafeCommandStore safeStore, TxnId txnId, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot) |
| { |
| return preacceptOrRecover(safeStore, txnId, partialTxn, route, progressKey, ballot); |
| } |
| |
| private static AcceptOutcome preacceptOrRecover(SafeCommandStore safeStore, TxnId txnId, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot) |
| { |
| SafeCommand safeCommand = safeStore.command(txnId); |
| Command command = safeCommand.current(); |
| |
| int compareBallots = command.promised().compareTo(ballot); |
| if (compareBallots > 0) |
| { |
| logger.trace("{}: skipping preaccept - higher ballot witnessed ({})", txnId, command.promised()); |
| return AcceptOutcome.RejectedBallot; |
| } |
| |
| if (command.known().definition.isKnown()) |
| { |
| Invariants.checkState(command.status() == Invalidated || command.executeAt() != null); |
| logger.trace("{}: skipping preaccept - already known ({})", txnId, command.status()); |
| // in case of Ballot.ZERO, we must either have a competing recovery coordinator or have late delivery of the |
| // preaccept; in the former case we should abandon coordination, and in the latter we have already completed |
| safeCommand.updatePromised(ballot); |
| return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant : AcceptOutcome.Success; |
| } |
| |
| Ranges coordinateRanges = coordinateRanges(safeStore, command); |
| Invariants.checkState(!coordinateRanges.isEmpty()); |
| CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, command.txnId(), command, route, progressKey, coordinateRanges); |
| ProgressShard shard = progressShard(attrs, progressKey, coordinateRanges); |
| if (!validate(command.status(), attrs, Ranges.EMPTY, coordinateRanges, shard, route, Set, partialTxn, Set, null, Ignore)) |
| throw new IllegalStateException(); |
| |
| // FIXME: this should go into a consumer method |
| attrs = set(safeStore, command, attrs, Ranges.EMPTY, coordinateRanges, shard, route, partialTxn, Set, null, Ignore); |
| if (command.executeAt() == null) |
| { |
| // unlike in the Accord paper, we partition shards within a node, so that to ensure a total order we must either: |
| // - use a global logical clock to issue new timestamps; or |
| // - assign each shard _and_ process a unique id, and use both as components of the timestamp |
| // if we are performing recovery (i.e. non-zero ballot), do not permit a fast path decision as we want to |
| // invalidate any transactions that were not completed by their initial coordinator |
| Timestamp executeAt = ballot.equals(Ballot.ZERO) |
| ? safeStore.preaccept(txnId, partialTxn.keys()) |
| : safeStore.time().uniqueNow(txnId); |
| command = safeCommand.preaccept(attrs, executeAt, ballot); |
| safeStore.progressLog().preaccepted(command, shard); |
| } |
| else |
| { |
| // TODO (expected, ?): in the case that we are pre-committed but had not been preaccepted/accepted, should we inform progressLog? |
| safeCommand.markDefined(attrs, ballot); |
| } |
| |
| safeStore.notifyListeners(safeCommand); |
| return AcceptOutcome.Success; |
| } |
| |
| public static boolean preacceptInvalidate(SafeCommandStore safeStore, TxnId txnId, Ballot ballot) |
| { |
| SafeCommand safeCommand = safeStore.command(txnId); |
| Command command = safeCommand.current(); |
| if (command.promised().compareTo(ballot) > 0) |
| { |
| logger.trace("{}: skipping preacceptInvalidate - witnessed higher ballot ({})", command.txnId(), command.promised()); |
| return false; |
| } |
| safeCommand.updatePromised(ballot); |
| return true; |
| } |
| |
| public static AcceptOutcome accept(SafeCommandStore safeStore, TxnId txnId, Ballot ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps) |
| { |
| SafeCommand safeCommand = safeStore.command(txnId); |
| Command command = safeCommand.current(); |
| if (command.promised().compareTo(ballot) > 0) |
| { |
| logger.trace("{}: skipping accept - witnessed higher ballot ({} > {})", txnId, command.promised(), ballot); |
| return AcceptOutcome.RejectedBallot; |
| } |
| |
| if (command.hasBeen(PreCommitted)) |
| { |
| logger.trace("{}: skipping accept - already committed ({})", txnId, command.status()); |
| return AcceptOutcome.Redundant; |
| } |
| |
| Ranges coordinateRanges = coordinateRanges(safeStore, command); |
| Ranges acceptRanges = txnId.epoch() == executeAt.epoch() ? coordinateRanges : safeStore.ranges().between(txnId.epoch(), executeAt.epoch()); |
| Invariants.checkState(!acceptRanges.isEmpty()); |
| |
| CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, command.txnId(), command, route, progressKey, coordinateRanges); |
| ProgressShard shard = progressShard(attrs, progressKey, coordinateRanges); |
| if (!validate(command.status(), attrs, coordinateRanges, Ranges.EMPTY, shard, route, Ignore, null, Ignore, partialDeps, Set)) |
| { |
| throw new AssertionError("Invalid response from validate function"); |
| } |
| |
| // TODO (desired, clarity/efficiency): we don't need to set the route here, and perhaps we don't even need to |
| // distributed partialDeps at all, since all we gain is not waiting for these transactions to commit during |
| // recovery. We probably don't want to directly persist a Route in any other circumstances, either, to ease persistence. |
| attrs = set(safeStore, command, attrs, coordinateRanges, acceptRanges, shard, route, null, Ignore, partialDeps, Set); |
| |
| // set only registers by transaction keys, which we mightn't already have received |
| if (!command.known().isDefinitionKnown()) |
| safeStore.register(keys, acceptRanges, command); |
| |
| command = safeCommand.accept(attrs, executeAt, ballot); |
| safeStore.progressLog().accepted(command, shard); |
| safeStore.notifyListeners(safeCommand); |
| |
| return AcceptOutcome.Success; |
| } |
| |
| public static AcceptOutcome acceptInvalidate(SafeCommandStore safeStore, SafeCommand safeCommand, Ballot ballot) |
| { |
| Command command = safeCommand.current(); |
| if (command.promised().compareTo(ballot) > 0) |
| { |
| logger.trace("{}: skipping accept invalidated - witnessed higher ballot ({} > {})", command.txnId(), command.promised(), ballot); |
| return AcceptOutcome.RejectedBallot; |
| } |
| |
| if (command.hasBeen(PreCommitted)) |
| { |
| logger.trace("{}: skipping accept invalidated - already committed ({})", command.txnId(), command.status()); |
| return AcceptOutcome.Redundant; |
| } |
| |
| logger.trace("{}: accepted invalidated", command.txnId()); |
| |
| safeCommand.acceptInvalidated(ballot); |
| safeStore.notifyListeners(safeCommand); |
| return AcceptOutcome.Success; |
| } |
| |
| public enum CommitOutcome {Success, Redundant, Insufficient;} |
| |
| |
| // relies on mutual exclusion for each key |
| public static CommitOutcome commit(SafeCommandStore safeStore, TxnId txnId, Route<?> route, @Nullable RoutingKey progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps partialDeps) |
| { |
| SafeCommand safeCommand = safeStore.command(txnId); |
| Command command = safeCommand.current(); |
| |
| if (command.hasBeen(PreCommitted)) |
| { |
| logger.trace("{}: skipping commit - already committed ({})", txnId, command.status()); |
| if (!executeAt.equals(command.executeAt()) || command.status() == Invalidated) |
| safeStore.agent().onInconsistentTimestamp(command, (command.status() == Invalidated ? Timestamp.NONE : command.executeAt()), executeAt); |
| |
| if (command.hasBeen(Committed)) |
| return CommitOutcome.Redundant; |
| } |
| |
| Ranges coordinateRanges = coordinateRanges(safeStore, command); |
| // TODO (expected, consider): consider ranges between coordinateRanges and executeRanges? Perhaps don't need them |
| Ranges executeRanges = executeRanges(safeStore, executeAt); |
| |
| CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, command.txnId(), command, route, progressKey, coordinateRanges); |
| ProgressShard shard = progressShard(attrs, progressKey, coordinateRanges); |
| |
| if (!validate(command.status(), attrs, coordinateRanges, executeRanges, shard, route, Check, partialTxn, Add, partialDeps, Set)) |
| { |
| safeCommand.updateAttributes(attrs); |
| return CommitOutcome.Insufficient; |
| } |
| |
| // FIXME: split up set |
| attrs = set(safeStore, command, attrs, coordinateRanges, executeRanges, shard, route, partialTxn, Add, partialDeps, Set); |
| |
| logger.trace("{}: committed with executeAt: {}, deps: {}", txnId, executeAt, partialDeps); |
| WaitingOn waitingOn = populateWaitingOn(safeStore, txnId, executeAt, partialDeps); |
| command = safeCommand.commit(attrs, executeAt, waitingOn); |
| |
| safeStore.progressLog().committed(command, shard); |
| |
| // TODO (expected, safety): introduce intermediate status to avoid reentry when notifying listeners (which might notify us) |
| maybeExecute(safeStore, safeCommand, shard, true, true); |
| return CommitOutcome.Success; |
| } |
| |
| // relies on mutual exclusion for each key |
| public static void precommit(SafeCommandStore safeStore, TxnId txnId, Timestamp executeAt) |
| { |
| SafeCommand safeCommand = safeStore.command(txnId); |
| Command command = safeCommand.current(); |
| if (command.hasBeen(PreCommitted)) |
| { |
| logger.trace("{}: skipping precommit - already committed ({})", txnId, command.status()); |
| if (executeAt.equals(command.executeAt()) && command.status() != Invalidated) |
| return; |
| |
| safeStore.agent().onInconsistentTimestamp(command, (command.status() == Invalidated ? Timestamp.NONE : command.executeAt()), executeAt); |
| } |
| |
| safeCommand.precommit(executeAt); |
| safeStore.notifyListeners(safeCommand); |
| logger.trace("{}: precommitted with executeAt: {}", txnId, executeAt); |
| } |
| |
| protected static WaitingOn populateWaitingOn(SafeCommandStore safeStore, TxnId txnId, Timestamp executeAt, PartialDeps partialDeps) |
| { |
| Ranges ranges = safeStore.ranges().since(executeAt.epoch()); |
| if (ranges == null) |
| return WaitingOn.EMPTY; |
| |
| WaitingOn.Update update = new WaitingOn.Update(); |
| partialDeps.forEach(ranges, depId -> { |
| SafeCommand safeCommand = safeStore.ifLoaded(depId); |
| if (safeCommand == null) |
| { |
| update.addWaitingOnCommit(depId); |
| safeStore.addAndInvokeListener(depId, txnId); |
| } |
| else |
| { |
| Command command = safeCommand.current(); |
| switch (command.status()) |
| { |
| default: |
| throw new IllegalStateException(); |
| case NotWitnessed: |
| case PreAccepted: |
| case Accepted: |
| case AcceptedInvalidate: |
| case PreCommitted: |
| // we don't know when these dependencies will execute, and cannot execute until we do |
| |
| command = safeCommand.addListener(new Command.Listener(txnId)); |
| update.addWaitingOnCommit(command.txnId()); |
| break; |
| case Committed: |
| // TODO (desired, efficiency): split into ReadyToRead and ReadyToWrite; |
| // the distributed read can be performed as soon as those keys are ready, |
| // and in parallel with any other reads. the client can even ACK immediately after; |
| // only the write needs to be postponed until other in-progress reads complete |
| case ReadyToExecute: |
| case PreApplied: |
| case Applied: |
| command = safeCommand.addListener(new Command.Listener(txnId)); |
| insertPredecessor(txnId, executeAt, update, command); |
| case Invalidated: |
| break; |
| } |
| } |
| }); |
| return update.build(); |
| } |
| |
| // TODO (expected, ?): commitInvalidate may need to update cfks _if_ possible |
| public static void commitInvalidate(SafeCommandStore safeStore, TxnId txnId) |
| { |
| SafeCommand safeCommand = safeStore.command(txnId); |
| Command command = safeCommand.current(); |
| if (command.hasBeen(PreCommitted)) |
| { |
| logger.trace("{}: skipping commit invalidated - already committed ({})", txnId, command.status()); |
| if (!command.hasBeen(Invalidated)) |
| safeStore.agent().onInconsistentTimestamp(command, Timestamp.NONE, command.executeAt()); |
| |
| return; |
| } |
| |
| ProgressShard shard = progressShard(safeStore, command); |
| safeStore.progressLog().invalidated(command, shard); |
| |
| CommonAttributes attrs = command; |
| if (command.partialDeps() == null) |
| attrs = attrs.mutable().partialDeps(PartialDeps.NONE); |
| safeCommand.commitInvalidated(attrs, txnId); |
| logger.trace("{}: committed invalidated", txnId); |
| |
| safeStore.notifyListeners(safeCommand); |
| } |
| |
| public enum ApplyOutcome {Success, Redundant, Insufficient} |
| |
| |
| public static ApplyOutcome apply(SafeCommandStore safeStore, TxnId txnId, long untilEpoch, Route<?> route, Timestamp executeAt, @Nullable PartialDeps partialDeps, Writes writes, Result result) |
| { |
| SafeCommand safeCommand = safeStore.command(txnId); |
| Command command = safeCommand.current(); |
| if (command.hasBeen(PreApplied) && executeAt.equals(command.executeAt())) |
| { |
| logger.trace("{}: skipping apply - already executed ({})", txnId, command.status()); |
| return ApplyOutcome.Redundant; |
| } |
| else if (command.hasBeen(PreCommitted) && !executeAt.equals(command.executeAt())) |
| { |
| safeStore.agent().onInconsistentTimestamp(command, command.executeAt(), executeAt); |
| } |
| |
| Ranges coordinateRanges = coordinateRanges(safeStore, command); |
| Ranges executeRanges = executeRanges(safeStore, executeAt); |
| if (untilEpoch < safeStore.latestEpoch()) |
| { |
| Ranges expectedRanges = safeStore.ranges().between(executeAt.epoch(), untilEpoch); |
| Invariants.checkState(expectedRanges.containsAll(executeRanges)); |
| } |
| |
| CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, command.txnId(), command, route, coordinateRanges); |
| ProgressShard shard = progressShard(attrs, coordinateRanges); |
| |
| if (!validate(command.status(), attrs, coordinateRanges, executeRanges, shard, route, Check, null, Check, partialDeps, command.hasBeen(Committed) ? Add : TrySet)) |
| { |
| safeCommand.updateAttributes(attrs); |
| return ApplyOutcome.Insufficient; // TODO (expected, consider): this should probably be an assertion failure if !TrySet |
| } |
| |
| WaitingOn waitingOn = !command.hasBeen(Committed) ? populateWaitingOn(safeStore, txnId, executeAt, partialDeps) : command.asCommitted().waitingOn(); |
| attrs = set(safeStore, command, attrs, coordinateRanges, executeRanges, shard, route, null, Check, partialDeps, command.hasBeen(Committed) ? Add : TrySet); |
| |
| safeCommand.preapplied(attrs, executeAt, waitingOn, writes, result); |
| safeStore.notifyListeners(safeCommand); |
| logger.trace("{}: apply, status set to Executed with executeAt: {}, deps: {}", txnId, executeAt, partialDeps); |
| |
| maybeExecute(safeStore, safeCommand, shard, true, true); |
| safeStore.progressLog().executed(safeCommand.current(), shard); |
| |
| return ApplyOutcome.Success; |
| } |
| |
| public static void listenerUpdate(SafeCommandStore safeStore, SafeCommand safeListener, SafeCommand safeUpdated) |
| { |
| Command listener = safeListener.current(); |
| Command updated = safeUpdated.current(); |
| logger.trace("{}: updating as listener in response to change on {} with status {} ({})", |
| listener.txnId(), updated.txnId(), updated.status(), updated); |
| switch (updated.status()) |
| { |
| default: |
| throw new IllegalStateException("Unexpected status: " + updated.status()); |
| case NotWitnessed: |
| case PreAccepted: |
| case Accepted: |
| case AcceptedInvalidate: |
| break; |
| |
| case PreCommitted: |
| case Committed: |
| case ReadyToExecute: |
| case PreApplied: |
| case Applied: |
| case Invalidated: |
| updatePredecessorAndMaybeExecute(safeStore, safeListener, safeUpdated, true); |
| break; |
| } |
| } |
| |
| protected static void postApply(SafeCommandStore safeStore, TxnId txnId) |
| { |
| logger.trace("{} applied, setting status to Applied and notifying listeners", txnId); |
| SafeCommand safeCommand = safeStore.command(txnId); |
| safeCommand.applied(); |
| safeStore.notifyListeners(safeCommand); |
| } |
| |
| private static AsyncChain<Void> applyChain(SafeCommandStore safeStore, PreLoadContext context, TxnId txnId) |
| { |
| Command.Executed command = safeStore.command(txnId).current().asExecuted(); |
| if (command.hasBeen(Applied)) |
| return AsyncChains.success(null); |
| CommandStore unsafeStore = safeStore.commandStore(); |
| return command.writes().apply(safeStore) |
| .flatMap(unused -> unsafeStore.submit(context, ss -> { |
| postApply(ss, txnId); |
| return null; |
| })); |
| } |
| |
| private static void apply(SafeCommandStore safeStore, Command.Executed command) |
| { |
| CommandStore unsafeStore = safeStore.commandStore(); |
| TxnId txnId = command.txnId(); |
| PreLoadContext context = command.contextForSelf(); |
| // this is sometimes called from a listener update, which will not have the keys in context |
| if (safeStore.canExecuteWith(context)) |
| { |
| applyChain(safeStore, context, txnId).begin(safeStore.agent()); |
| } |
| else |
| { |
| unsafeStore.submit(context, ss -> { |
| applyChain(ss, context, txnId).begin(ss.agent()); |
| return null; |
| }).begin(safeStore.agent()); |
| } |
| } |
| |
| // TODO (expected, API consistency): maybe split into maybeExecute and maybeApply? |
| private static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand safeCommand, ProgressShard shard, boolean alwaysNotifyListeners, boolean notifyWaitingOn) |
| { |
| Command command = safeCommand.current(); |
| if (logger.isTraceEnabled()) |
| logger.trace("{}: Maybe executing with status {}. Will notify listeners on noop: {}", command.txnId(), command.status(), alwaysNotifyListeners); |
| |
| if (command.status() != Committed && command.status() != PreApplied) |
| { |
| if (alwaysNotifyListeners) |
| safeStore.notifyListeners(safeCommand); |
| return false; |
| } |
| |
| if (command.asCommitted().isWaitingOnDependency()) |
| { |
| if (alwaysNotifyListeners) |
| safeStore.notifyListeners(safeCommand); |
| |
| if (notifyWaitingOn) |
| new NotifyWaitingOn(command.txnId()).accept(safeStore); |
| return false; |
| } |
| |
| // FIXME: need to communicate to caller that we didn't execute if we take one of the above paths |
| |
| switch (command.status()) |
| { |
| case Committed: |
| // TODO (desirable, efficiency): maintain distinct ReadyToRead and ReadyToWrite states |
| command = safeCommand.readyToExecute(); |
| logger.trace("{}: set to ReadyToExecute", command.txnId()); |
| safeStore.progressLog().readyToExecute(command, shard); |
| safeStore.notifyListeners(safeCommand); |
| return true; |
| |
| case PreApplied: |
| Ranges executeRanges = executeRanges(safeStore, command.executeAt()); |
| Command.Executed executed = command.asExecuted(); |
| boolean intersects = executed.writes().keys.intersects(executeRanges); |
| |
| if (intersects) |
| { |
| logger.trace("{}: applying", command.txnId()); |
| apply(safeStore, executed); |
| return true; |
| } |
| else |
| { |
| // TODO (desirable, performance): This could be performed immediately upon Committed |
| // but: if we later support transitive dependency elision this could be dangerous |
| logger.trace("{}: applying no-op", command.txnId()); |
| safeCommand.applied(); |
| safeStore.notifyListeners(safeCommand); |
| return true; |
| } |
| default: |
| throw new IllegalStateException(); |
| } |
| } |
| |
| /** |
| * @param safeDependency is either committed or invalidated |
| * @return true iff {@code maybeExecute} might now have a different outcome |
| */ |
| private static boolean updatePredecessor(SafeCommand safeCommand, WaitingOn.Update waitingOn, SafeCommand safeDependency) |
| { |
| Command.Committed command = safeCommand.current().asCommitted(); |
| Command dependency = safeDependency.current(); |
| Invariants.checkState(dependency.hasBeen(PreCommitted)); |
| if (dependency.hasBeen(Invalidated)) |
| { |
| logger.trace("{}: {} is invalidated. Stop listening and removing from waiting on commit set.", command.txnId(), dependency.txnId()); |
| safeDependency.removeListener(command.asListener()); |
| waitingOn.removeWaitingOnCommit(dependency.txnId()); |
| return true; |
| } |
| else if (dependency.executeAt().compareTo(command.executeAt()) > 0) |
| { |
| // dependency cannot be a predecessor if it executes later |
| logger.trace("{}: {} executes after us. Stop listening and removing from waiting on apply set.", command.txnId(), dependency.txnId()); |
| waitingOn.removeWaitingOn(dependency.txnId(), dependency.executeAt()); |
| safeDependency.removeListener(command.asListener()); |
| return true; |
| } |
| else if (dependency.hasBeen(Applied)) |
| { |
| logger.trace("{}: {} has been applied. Stop listening and removing from waiting on apply set.", command.txnId(), dependency.txnId()); |
| waitingOn.removeWaitingOn(dependency.txnId(), dependency.executeAt()); |
| safeDependency.removeListener(command.asListener()); |
| return true; |
| } |
| else if (command.isWaitingOnDependency()) |
| { |
| logger.trace("{}: adding {} to waiting on apply set.", command.txnId(), dependency.txnId()); |
| waitingOn.addWaitingOnApply(dependency.txnId(), dependency.executeAt()); |
| waitingOn.removeWaitingOnCommit(dependency.txnId()); |
| return false; |
| } |
| else |
| { |
| throw new IllegalStateException(); |
| } |
| } |
| |
| private static void insertPredecessor(TxnId txnId, Timestamp executeAt, WaitingOn.Update update, Command dependency) |
| { |
| Invariants.checkState(dependency.hasBeen(Committed)); |
| if (dependency.hasBeen(Invalidated)) |
| { |
| logger.trace("{}: {} is invalidated. Do not insert.", txnId, dependency.txnId()); |
| } |
| else if (dependency.executeAt().compareTo(executeAt) > 0) |
| { |
| // dependency cannot be a predecessor if it executes later |
| logger.trace("{}: {} executes after us. Do not insert.", txnId, dependency.txnId()); |
| } |
| else if (dependency.hasBeen(Applied)) |
| { |
| logger.trace("{}: {} has been applied. Do not insert.", txnId, dependency.txnId()); |
| } |
| else |
| { |
| logger.trace("{}: adding {} to waiting on apply set.", txnId, dependency.txnId()); |
| update.addWaitingOnApply(dependency.txnId(), dependency.executeAt()); |
| } |
| } |
| |
| static void updatePredecessorAndMaybeExecute(SafeCommandStore safeStore, SafeCommand safeCommand, SafeCommand livePredecessor, boolean notifyWaitingOn) |
| { |
| Command.Committed command = safeCommand.current().asCommitted(); |
| if (command.hasBeen(Applied)) |
| return; |
| |
| WaitingOn.Update waitingOn = new WaitingOn.Update(command); |
| boolean attemptExecution = updatePredecessor(safeCommand, waitingOn, livePredecessor); |
| command = safeCommand.updateWaitingOn(waitingOn); |
| |
| if (attemptExecution) |
| maybeExecute(safeStore, safeCommand, progressShard(safeStore, command), false, notifyWaitingOn); |
| } |
| |
| // TODO (now): check/move methods below |
| private static Command setDurability(SafeCommandStore safeStore, SafeCommand safeCommand, Durability durability, RoutingKey homeKey, @Nullable Timestamp executeAt) |
| { |
| Command command = safeCommand.current(); |
| CommonAttributes attrs = updateHomeKey(safeStore, command.txnId(), command, homeKey); |
| if (executeAt != null && command.status().hasBeen(Committed) && !command.asCommitted().executeAt().equals(executeAt)) |
| safeStore.agent().onInconsistentTimestamp(command, command.asCommitted().executeAt(), executeAt); |
| attrs = attrs.mutable().durability(durability); |
| return safeCommand.updateAttributes(attrs); |
| } |
| |
| public static Command setDurability(SafeCommandStore safeStore, TxnId txnId, Durability durability, RoutingKey homeKey, @Nullable Timestamp executeAt) |
| { |
| return setDurability(safeStore, safeStore.command(txnId), durability, homeKey, executeAt); |
| } |
| |
| private static TxnId firstWaitingOnCommit(Command command) |
| { |
| if (!command.hasBeen(Committed)) |
| return null; |
| |
| Command.Committed committed = command.asCommitted(); |
| return committed.isWaitingOnCommit() ? committed.waitingOnCommit().first() : null; |
| } |
| |
| private static TxnId firstWaitingOnApply(Command command, @Nullable TxnId ifExecutesBefore) |
| { |
| if (!command.hasBeen(Committed)) |
| return null; |
| |
| Command.Committed committed = command.asCommitted(); |
| if (!committed.isWaitingOnApply()) |
| return null; |
| |
| Map.Entry<Timestamp, TxnId> first = committed.waitingOnApply().firstEntry(); |
| if (ifExecutesBefore == null || first.getKey().compareTo(ifExecutesBefore) < 0) |
| return first.getValue(); |
| |
| return null; |
| } |
| |
| static class NotifyWaitingOn implements PreLoadContext, Consumer<SafeCommandStore> |
| { |
| Known[] blockedUntil = new Known[4]; |
| TxnId[] txnIds = new TxnId[4]; |
| int depth; |
| |
| public NotifyWaitingOn(TxnId txnId) |
| { |
| txnIds[0] = txnId; |
| blockedUntil[0] = Known.Done; |
| } |
| |
| @Override |
| public void accept(SafeCommandStore safeStore) |
| { |
| SafeCommand prevSafe = get(safeStore, depth - 1); |
| while (depth >= 0) |
| { |
| Command prev = prevSafe != null ? prevSafe.current() : null; |
| SafeCommand curSafe = safeStore.ifLoaded(txnIds[depth]); |
| Command cur = curSafe != null ? curSafe.current() : null; |
| Known until = blockedUntil[depth]; |
| if (cur == null) |
| { |
| // need to load; schedule execution for later |
| safeStore.commandStore().execute(this, this).begin(safeStore.agent()); |
| return; |
| } |
| |
| if (prev != null) |
| { |
| if (cur.has(until) || (cur.hasBeen(PreCommitted) && cur.executeAt().compareTo(prev.executeAt()) > 0)) |
| { |
| updatePredecessorAndMaybeExecute(safeStore, prevSafe, curSafe, false); |
| --depth; |
| prevSafe = get(safeStore, depth - 1); |
| continue; |
| } |
| } |
| else if (cur.has(until)) |
| { |
| // we're done; have already applied |
| Invariants.checkState(depth == 0); |
| break; |
| } |
| |
| TxnId directlyBlockedOnCommit = firstWaitingOnCommit(cur); |
| TxnId directlyBlockedOnApply = firstWaitingOnApply(cur, directlyBlockedOnCommit); |
| if (directlyBlockedOnApply != null) |
| { |
| push(directlyBlockedOnApply, Known.Done); |
| } |
| else if (directlyBlockedOnCommit != null) |
| { |
| push(directlyBlockedOnCommit, ExecuteAtOnly); |
| } |
| else |
| { |
| if (cur.hasBeen(Committed) && !cur.hasBeen(ReadyToExecute) && !cur.asCommitted().isWaitingOnDependency()) |
| { |
| if (!maybeExecute(safeStore, curSafe, progressShard(safeStore, cur), false, false)) |
| throw new AssertionError("Is able to Apply, but has not done so"); |
| // loop and re-test the command's status; we may still want to notify blocking, esp. if not homeShard |
| continue; |
| } |
| |
| Unseekables<?, ?> someKeys = cur.maxUnseekables(); |
| if (someKeys == null && prev != null) someKeys = prev.partialDeps().someUnseekables(cur.txnId()); |
| Invariants.checkState(someKeys != null); |
| logger.trace("{} blocked on {} until {}", txnIds[0], cur.txnId(), until); |
| safeStore.progressLog().waiting(cur.txnId(), until, someKeys); |
| return; |
| } |
| prevSafe = curSafe; |
| } |
| } |
| |
| private SafeCommand get(SafeCommandStore safeStore, int i) |
| { |
| return i >= 0 ? safeStore.command(txnIds[i]) : null; |
| } |
| |
| void push(TxnId by, Known until) |
| { |
| if (++depth == txnIds.length) |
| { |
| txnIds = Arrays.copyOf(txnIds, txnIds.length * 2); |
| blockedUntil = Arrays.copyOf(blockedUntil, txnIds.length); |
| } |
| txnIds[depth] = by; |
| blockedUntil[depth] = until; |
| } |
| |
| @Override |
| public Iterable<TxnId> txnIds() |
| { |
| return Arrays.asList(txnIds).subList(0, depth + 1); |
| } |
| |
| @Override |
| public Seekables<?, ?> keys() |
| { |
| return Keys.EMPTY; |
| } |
| } |
| |
| public static Command updateHomeKey(SafeCommandStore safeStore, SafeCommand safeCommand, RoutingKey homeKey) |
| { |
| Command command = safeCommand.current(); |
| CommonAttributes attrs = updateHomeKey(safeStore, command.txnId(), command, homeKey); |
| return safeCommand.updateAttributes(attrs); |
| } |
| |
| /** |
| * A key nominated to represent the "home" shard - only members of the home shard may be nominated to recover |
| * a transaction, to reduce the cluster-wide overhead of ensuring progress. A transaction that has only been |
| * witnessed at PreAccept may however trigger a process of ensuring the home shard is durably informed of |
| * the transaction. |
| * |
| * Note that for ProgressLog purposes the "home shard" is the shard as of txnId.epoch. |
| * For recovery purposes the "home shard" is as of txnId.epoch until Committed, and executeAt.epoch once Executed |
| */ |
| public static CommonAttributes updateHomeKey(SafeCommandStore safeStore, TxnId txnId, CommonAttributes attrs, RoutingKey homeKey) |
| { |
| if (attrs.homeKey() == null) |
| { |
| attrs = attrs.mutable().homeKey(homeKey); |
| // TODO (low priority, safety): if we're processed on a node that does not know the latest epoch, |
| // do we guarantee the home key calculation is unchanged since the prior epoch? |
| if (attrs.progressKey() == null && owns(safeStore, txnId.epoch(), homeKey)) |
| attrs = attrs.mutable().progressKey(homeKey); |
| } |
| else if (!attrs.homeKey().equals(homeKey)) |
| { |
| throw new IllegalStateException(); |
| } |
| return attrs; |
| } |
| |
| private static CommonAttributes updateHomeAndProgressKeys(SafeCommandStore safeStore, TxnId txnId, CommonAttributes attrs, Route<?> route, @Nullable RoutingKey progressKey, Ranges coordinateRanges) |
| { |
| attrs = updateHomeKey(safeStore, txnId, attrs, route.homeKey()); |
| if (progressKey == null || progressKey == NO_PROGRESS_KEY) |
| { |
| if (attrs.progressKey() == null) |
| attrs = attrs.mutable().progressKey(NO_PROGRESS_KEY); |
| return attrs; |
| } |
| if (attrs.progressKey() == null) attrs = attrs.mutable().progressKey(progressKey); |
| else if (!attrs.progressKey().equals(progressKey)) |
| throw new AssertionError(); |
| return attrs; |
| } |
| |
| private static CommonAttributes updateHomeAndProgressKeys(SafeCommandStore safeStore, TxnId txnId, CommonAttributes attrs, Route<?> route, Ranges coordinateRanges) |
| { |
| if (attrs.progressKey() == null) |
| return attrs; |
| |
| return updateHomeAndProgressKeys(safeStore, txnId, attrs, route, attrs.progressKey(), coordinateRanges); |
| } |
| |
| private static ProgressShard progressShard(CommonAttributes attrs, @Nullable RoutingKey progressKey, Ranges coordinateRanges) |
| { |
| if (progressKey == null || progressKey == NO_PROGRESS_KEY) |
| return No; |
| |
| if (!coordinateRanges.contains(progressKey)) |
| return No; |
| |
| return progressKey.equals(attrs.homeKey()) ? Home : Local; |
| } |
| |
| private static ProgressShard progressShard(CommonAttributes attrs, Ranges coordinateRanges) |
| { |
| if (attrs.progressKey() == null) |
| return Unsure; |
| |
| return progressShard(attrs, attrs.progressKey(), coordinateRanges); |
| } |
| |
| private static ProgressShard progressShard(SafeCommandStore safeStore, Command command) |
| { |
| RoutingKey progressKey = command.progressKey(); |
| if (progressKey == null) |
| return Unsure; |
| |
| if (progressKey == noProgressKey()) |
| return No; |
| |
| Ranges coordinateRanges = safeStore.ranges().at(command.txnId().epoch()); |
| if (!coordinateRanges.contains(progressKey)) |
| return No; |
| |
| return progressKey.equals(command.homeKey()) ? Home : Local; |
| } |
| |
| |
| private static Ranges coordinateRanges(SafeCommandStore safeStore, Command command) |
| { |
| return safeStore.ranges().at(command.txnId().epoch()); |
| } |
| |
| private static Ranges executeRanges(SafeCommandStore safeStore, Timestamp executeAt) |
| { |
| return safeStore.ranges().since(executeAt.epoch()); |
| } |
| |
| enum EnsureAction {Ignore, Check, Add, TrySet, Set} |
| |
| private static CommonAttributes set(SafeCommandStore safeStore, Command command, CommonAttributes attrs, |
| Ranges existingRanges, Ranges additionalRanges, ProgressShard shard, Route<?> route, |
| @Nullable PartialTxn partialTxn, EnsureAction ensurePartialTxn, |
| @Nullable PartialDeps partialDeps, EnsureAction ensurePartialDeps) |
| { |
| Invariants.checkState(attrs.progressKey() != null); |
| Ranges allRanges = existingRanges.with(additionalRanges); |
| |
| if (shard.isProgress()) |
| attrs = attrs.mutable().route(Route.merge(attrs.route(), (Route)route)); |
| else |
| attrs = attrs.mutable().route(route.slice(allRanges)); |
| |
| // TODO (soon): stop round-robin hashing; partition only on ranges |
| switch (ensurePartialTxn) |
| { |
| case Add: |
| if (partialTxn == null) |
| break; |
| |
| if (attrs.partialTxn() != null) |
| { |
| partialTxn = partialTxn.slice(allRanges, shard.isHome()); |
| Routables.foldlMissing((Seekables)partialTxn.keys(), attrs.partialTxn().keys(), (keyOrRange, p, v, i) -> { |
| // TODO (expected, efficiency): we may register the same ranges more than once |
| safeStore.register(keyOrRange, allRanges, command); |
| return v; |
| }, 0, 0, 1); |
| attrs = attrs.mutable().partialTxn(attrs.partialTxn().with(partialTxn)); |
| break; |
| } |
| |
| case Set: |
| case TrySet: |
| attrs = attrs.mutable().partialTxn(partialTxn = partialTxn.slice(allRanges, shard.isHome())); |
| // TODO (expected, efficiency): we may register the same ranges more than once |
| // TODO (desirable, efficiency): no need to register on PreAccept if already Accepted |
| safeStore.register(partialTxn.keys(), allRanges, command); |
| break; |
| } |
| |
| switch (ensurePartialDeps) |
| { |
| case Add: |
| if (partialDeps == null) |
| break; |
| |
| if (attrs.partialDeps() != null) |
| { |
| attrs = attrs.mutable().partialDeps(attrs.partialDeps().with(partialDeps.slice(allRanges))); |
| break; |
| } |
| |
| case Set: |
| case TrySet: |
| attrs = attrs.mutable().partialDeps(partialDeps.slice(allRanges)); |
| break; |
| } |
| return attrs; |
| } |
| |
| /** |
| * Validate we have sufficient information for the route, partialTxn and partialDeps fields, and if so update them; |
| * otherwise return false (or throw an exception if an illegal state is encountered) |
| */ |
| private static boolean validate(Status status, CommonAttributes attrs, Ranges existingRanges, Ranges additionalRanges, ProgressShard shard, |
| Route<?> route, EnsureAction ensureRoute, |
| @Nullable PartialTxn partialTxn, EnsureAction ensurePartialTxn, |
| @Nullable PartialDeps partialDeps, EnsureAction ensurePartialDeps) |
| { |
| if (shard == Unsure) |
| return false; |
| |
| // first validate route |
| if (shard.isHome()) |
| { |
| switch (ensureRoute) |
| { |
| default: throw new AssertionError(); |
| case Check: |
| if (!isFullRoute(attrs.route()) && !isFullRoute(route)) |
| return false; |
| case Ignore: |
| break; |
| case Add: |
| case Set: |
| if (!isFullRoute(route)) |
| throw new IllegalArgumentException("Incomplete route (" + route + ") sent to home shard"); |
| break; |
| case TrySet: |
| if (!isFullRoute(route)) |
| return false; |
| } |
| } |
| else |
| { |
| // failing any of these tests is always an illegal state |
| if (!route.covers(existingRanges)) |
| return false; |
| |
| if (existingRanges != additionalRanges && !route.covers(additionalRanges)) |
| throw new IllegalArgumentException("Incomplete route (" + route + ") provided; does not cover " + additionalRanges); |
| } |
| |
| // invalid to Add deps to Accepted or AcceptedInvalidate statuses, as Committed deps are not equivalent |
| // and we may erroneously believe we have covered a wider range than we have infact covered |
| if (ensurePartialDeps == Add) |
| Invariants.checkState(status != Accepted && status != AcceptedInvalidate); |
| |
| // validate new partial txn |
| if (!validate(ensurePartialTxn, existingRanges, additionalRanges, covers(attrs.partialTxn()), covers(partialTxn), "txn", partialTxn)) |
| return false; |
| |
| if (partialTxn != null && attrs.txnId().rw() != null && !attrs.txnId().rw().equals(partialTxn.kind())) |
| throw new IllegalArgumentException("Transaction has different kind to its TxnId"); |
| |
| if (shard.isHome() && ensurePartialTxn != Ignore) |
| { |
| if (!hasQuery(attrs.partialTxn()) && !hasQuery(partialTxn)) |
| throw new IllegalStateException(); |
| } |
| |
| return validate(ensurePartialDeps, existingRanges, additionalRanges, covers(attrs.partialDeps()), covers(partialDeps), "deps", partialDeps); |
| } |
| |
| // FIXME (immutable-state): has this been removed? |
| private static boolean validate(EnsureAction action, Ranges existingRanges, Ranges additionalRanges, |
| Ranges existing, Ranges adding, String kind, Object obj) |
| { |
| switch (action) |
| { |
| default: throw new IllegalStateException(); |
| case Ignore: |
| break; |
| |
| case TrySet: |
| if (adding != null) |
| { |
| if (!adding.containsAll(existingRanges)) |
| return false; |
| |
| if (additionalRanges != existingRanges && !adding.containsAll(additionalRanges)) |
| return false; |
| |
| break; |
| } |
| case Set: |
| // failing any of these tests is always an illegal state |
| Invariants.checkState(adding != null); |
| if (!adding.containsAll(existingRanges)) |
| throw new IllegalArgumentException("Incomplete " + kind + " (" + obj + ") provided; does not cover " + existingRanges); |
| |
| if (additionalRanges != existingRanges && !adding.containsAll(additionalRanges)) |
| throw new IllegalArgumentException("Incomplete " + kind + " (" + obj + ") provided; does not cover " + additionalRanges); |
| break; |
| |
| case Check: |
| case Add: |
| if (adding == null) |
| { |
| if (existing == null) |
| return false; |
| |
| Invariants.checkState(existing.containsAll(existingRanges)); |
| if (existingRanges != additionalRanges && !existing.containsAll(additionalRanges)) |
| { |
| if (action == Check) |
| return false; |
| |
| throw new IllegalArgumentException("Missing additional " + kind + "; existing does not cover " + additionalRanges.difference(existingRanges)); |
| } |
| } |
| else if (existing != null) |
| { |
| Ranges covering = adding.with(existing); |
| Invariants.checkState(covering.containsAll(existingRanges)); |
| if (existingRanges != additionalRanges && !covering.containsAll(additionalRanges)) |
| { |
| if (action == Check) |
| return false; |
| |
| throw new IllegalArgumentException("Incomplete additional " + kind + " (" + obj + ") provided; does not cover " + additionalRanges.difference(existingRanges)); |
| } |
| } |
| else |
| { |
| if (!adding.containsAll(existingRanges)) |
| return false; |
| |
| if (existingRanges != additionalRanges && !adding.containsAll(additionalRanges)) |
| { |
| if (action == Check) |
| return false; |
| |
| throw new IllegalArgumentException("Incomplete additional " + kind + " (" + obj + ") provided; does not cover " + additionalRanges.difference(existingRanges)); |
| } |
| } |
| break; |
| } |
| |
| return true; |
| } |
| |
| // TODO (low priority, API): this is an ugly hack, need to encode progress/homeKey/Route state combinations much more clearly |
| // (perhaps introduce encapsulating class representing each possible arrangement) |
| static class NoProgressKey implements RoutingKey |
| { |
| @Override |
| public int compareTo(@Nonnull RoutableKey that) |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Range asRange() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| private static final NoProgressKey NO_PROGRESS_KEY = new NoProgressKey(); |
| } |