blob: ef3709fb9a20c33e40c52a8f0392e1e7bd7ed5e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.local;
import accord.api.*;
import accord.local.Status.Durability;
import accord.local.Status.Known;
import accord.primitives.*;
import accord.primitives.Writes;
import accord.utils.Invariants;
import org.apache.cassandra.utils.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import static accord.local.Status.*;
import static accord.local.Status.Known.*;
import static accord.local.Status.Known.Done;
import static accord.local.Status.Known.ExecuteAtOnly;
import static accord.primitives.Route.isFullRoute;
import static accord.utils.Utils.listOf;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import accord.api.ProgressLog.ProgressShard;
import accord.primitives.Ranges;
import accord.primitives.Ballot;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.api.Result;
import accord.api.RoutingKey;
import static accord.api.ProgressLog.ProgressShard.Home;
import static accord.api.ProgressLog.ProgressShard.Local;
import static accord.api.ProgressLog.ProgressShard.No;
import static accord.api.ProgressLog.ProgressShard.Unsure;
import static accord.local.Command.EnsureAction.Add;
import static accord.local.Command.EnsureAction.Check;
import static accord.local.Command.EnsureAction.Ignore;
import static accord.local.Command.EnsureAction.Set;
import static accord.local.Command.EnsureAction.TrySet;
public abstract class Command implements CommandListener, BiConsumer<SafeCommandStore, CommandListener>, PreLoadContext
{
private static final Logger logger = LoggerFactory.getLogger(Command.class);
public abstract TxnId txnId();
// TODO (desirable, API consistency): should any of these calls be replaced by corresponding known() registers?
public boolean hasBeen(Status status)
{
return status().hasBeen(status);
}
public boolean has(Known known)
{
return known.isSatisfiedBy(saveStatus().known);
}
public boolean has(Definition definition)
{
return known().definition.compareTo(definition) >= 0;
}
public boolean has(Outcome outcome)
{
return known().outcome.compareTo(outcome) >= 0;
}
public boolean is(Status status)
{
return status() == status;
}
/**
* homeKey is a global value that defines the home shard - the one tasked with ensuring the transaction is finished.
* progressKey is a local value that defines the local shard responsible for ensuring progress on the transaction.
* This will be homeKey if it is owned by the node, and some other key otherwise. If not the home shard, the progress
* shard has much weaker responsibilities, only ensuring that the home shard has durably witnessed the txnId.
*
* TODO (expected, efficiency): we probably do not want to save this on its own, as we probably want to
* minimize IO interactions and discrete registers, so will likely reference commit log entries directly
* At which point we may impose a requirement that only a Route can be saved, not a homeKey on its own.
* Once this restriction is imposed, we no longer need to pass around Routable.Domain with TxnId.
*/
public abstract RoutingKey homeKey();
protected abstract void setHomeKey(RoutingKey key);
public abstract RoutingKey progressKey();
protected abstract void setProgressKey(RoutingKey key);
/**
* If this is the home shard, we require that this is a Route for all states &gt; NotWitnessed;
* otherwise for the local progress shard this is ordinarily a PartialRoute, and for other shards this is not set,
* so that there is only one copy per node that can be consulted to construct the full set of involved keys.
*
* If hasBeen(Committed) this must contain the keys for both txnId.epoch and executeAt.epoch
*/
public abstract @Nullable Route<?> route();
protected abstract void setRoute(Route<?> route);
public abstract PartialTxn partialTxn();
protected abstract void setPartialTxn(PartialTxn txn);
public abstract Ballot promised();
protected abstract void setPromised(Ballot ballot);
public abstract Ballot accepted();
protected abstract void setAccepted(Ballot ballot);
public abstract Timestamp executeAt();
protected abstract void setExecuteAt(Timestamp timestamp);
/**
* While !hasBeen(Committed), used only as a register for Accept state, used by Recovery
* If hasBeen(Committed), represents the full deps owned by this range for execution at both txnId.epoch
* AND executeAt.epoch so that it may be used for Recovery (which contacts only txnId.epoch topology),
* but also for execution.
*/
public abstract PartialDeps partialDeps();
protected abstract void setPartialDeps(PartialDeps deps);
public abstract Writes writes();
protected abstract void setWrites(Writes writes);
public abstract Result result();
protected abstract void setResult(Result result);
public abstract SaveStatus saveStatus();
protected abstract void setSaveStatus(SaveStatus status);
public Status status() { return saveStatus().status; }
protected void setStatus(Status status) { setSaveStatus(SaveStatus.get(status, known())); }
public Known known() { return saveStatus().known; }
public abstract Durability durability();
public abstract void setDurability(Durability v);
public abstract Command addListener(CommandListener listener);
public abstract void removeListener(CommandListener listener);
protected abstract void notifyListeners(SafeCommandStore safeStore);
protected abstract void addWaitingOnCommit(TxnId txnId);
protected abstract void removeWaitingOnCommit(TxnId txnId);
protected abstract TxnId firstWaitingOnCommit();
protected abstract void addWaitingOnApplyIfAbsent(TxnId txnId, Timestamp executeAt);
protected abstract TxnId firstWaitingOnApply(@Nullable TxnId ifExecutesBefore);
protected abstract void removeWaitingOn(TxnId txnId, Timestamp executeAt);
protected abstract boolean isWaitingOnDependency();
public boolean hasBeenWitnessed()
{
return partialTxn() != null;
}
@Override
public Iterable<TxnId> txnIds()
{
return Collections.singleton(txnId());
}
@Override
public Seekables<?, ?> keys()
{
// TODO (expected, consider): when do we need this, and will it always be sufficient?
return partialTxn().keys();
}
public void setDurability(SafeCommandStore safeStore, Durability durability, RoutingKey homeKey, @Nullable Timestamp executeAt)
{
updateHomeKey(safeStore, homeKey);
if (executeAt != null && hasBeen(PreCommitted) && !this.executeAt().equals(executeAt))
safeStore.agent().onInconsistentTimestamp(this, this.executeAt(), executeAt);
setDurability(durability);
}
public enum AcceptOutcome
{
Success, Redundant, RejectedBallot
}
public AcceptOutcome preaccept(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey)
{
return preacceptOrRecover(safeStore, partialTxn, route, progressKey, Ballot.ZERO);
}
public AcceptOutcome recover(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
{
return preacceptOrRecover(safeStore, partialTxn, route, progressKey, ballot);
}
private AcceptOutcome preacceptOrRecover(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
{
int compareBallots = promised().compareTo(ballot);
if (compareBallots > 0)
{
logger.trace("{}: skipping preaccept - higher ballot witnessed ({})", txnId(), promised());
return AcceptOutcome.RejectedBallot;
}
else if (compareBallots < 0)
{
// save the new ballot as a promise
setPromised(ballot);
}
if (known().definition.isKnown())
{
Invariants.checkState(status() == Invalidated || executeAt() != null);
logger.trace("{}: skipping preaccept - already known ({})", txnId(), status());
// in case of Ballot.ZERO, we must either have a competing recovery coordinator or have late delivery of the
// preaccept; in the former case we should abandon coordination, and in the latter we have already completed
return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant : AcceptOutcome.Success;
}
Ranges coordinateRanges = coordinateRanges(safeStore);
Invariants.checkState(!coordinateRanges.isEmpty());
ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
if (!validate(Ranges.EMPTY, coordinateRanges, shard, route, Set, partialTxn, Set, null, Ignore))
throw new IllegalStateException();
if (executeAt() == null)
{
TxnId txnId = txnId();
// unlike in the Accord paper, we partition shards within a node, so that to ensure a total order we must either:
// - use a global logical clock to issue new timestamps; or
// - assign each shard _and_ process a unique id, and use both as components of the timestamp
// if we are performing recovery (i.e. non-zero ballot), do not permit a fast path decision as we want to
// invalidate any transactions that were not completed by their initial coordinator
if (ballot.equals(Ballot.ZERO)) setExecuteAt(safeStore.preaccept(txnId, partialTxn.keys()));
else setExecuteAt(safeStore.time().uniqueNow(txnId));
if (status() == NotWitnessed)
setStatus(PreAccepted);
safeStore.progressLog().preaccepted(this, shard);
}
else
{
// TODO (expected, ?): in the case that we are pre-committed but had not been preaccepted/accepted, should we inform progressLog?
setSaveStatus(SaveStatus.enrich(saveStatus(), DefinitionOnly));
}
set(safeStore, Ranges.EMPTY, coordinateRanges, shard, route, partialTxn, Set, null, Ignore);
notifyListeners(safeStore);
return AcceptOutcome.Success;
}
public boolean preacceptInvalidate(Ballot ballot)
{
if (promised().compareTo(ballot) > 0)
{
logger.trace("{}: skipping preacceptInvalidate - witnessed higher ballot ({})", txnId(), promised());
return false;
}
setPromised(ballot);
return true;
}
public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps)
{
if (this.promised().compareTo(ballot) > 0)
{
logger.trace("{}: skipping accept - witnessed higher ballot ({} > {})", txnId(), promised(), ballot);
return AcceptOutcome.RejectedBallot;
}
if (hasBeen(PreCommitted))
{
logger.trace("{}: skipping accept - already committed ({})", txnId(), status());
return AcceptOutcome.Redundant;
}
TxnId txnId = txnId();
Ranges coordinateRanges = coordinateRanges(safeStore);
Ranges acceptRanges = txnId.epoch() == executeAt.epoch() ? coordinateRanges : safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
Invariants.checkState(!acceptRanges.isEmpty());
ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
if (!validate(coordinateRanges, Ranges.EMPTY, shard, route, Ignore, null, Ignore, partialDeps, Set))
throw new AssertionError("Invalid response from validate function");
setExecuteAt(executeAt);
setPromised(ballot);
setAccepted(ballot);
// TODO (desired, clarity/efficiency): we don't need to set the route here, and perhaps we don't even need to
// distributed partialDeps at all, since all we gain is not waiting for these transactions to commit during
// recovery. We probably don't want to directly persist a Route in any other circumstances, either, to ease persistence.
set(safeStore, coordinateRanges, acceptRanges, shard, route, null, Ignore, partialDeps, Set);
// set only registers by transaction keys, which we mightn't already have received
if (!known().isDefinitionKnown())
safeStore.register(keys, acceptRanges, this);
setStatus(Accepted);
safeStore.progressLog().accepted(this, shard);
notifyListeners(safeStore);
return AcceptOutcome.Success;
}
public AcceptOutcome acceptInvalidate(SafeCommandStore safeStore, Ballot ballot)
{
if (this.promised().compareTo(ballot) > 0)
{
logger.trace("{}: skipping accept invalidated - witnessed higher ballot ({} > {})", txnId(), promised(), ballot);
return AcceptOutcome.RejectedBallot;
}
if (hasBeen(PreCommitted))
{
logger.trace("{}: skipping accept invalidated - already committed ({})", txnId(), status());
return AcceptOutcome.Redundant;
}
setPromised(ballot);
setAccepted(ballot);
setStatus(AcceptedInvalidate);
setPartialDeps(null);
logger.trace("{}: accepted invalidated", txnId());
notifyListeners(safeStore);
return AcceptOutcome.Success;
}
public enum CommitOutcome { Success, Redundant, Insufficient }
// relies on mutual exclusion for each key
public CommitOutcome commit(SafeCommandStore safeStore, Route<?> route, @Nullable RoutingKey progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps partialDeps)
{
if (hasBeen(PreCommitted))
{
logger.trace("{}: skipping commit - already committed ({})", txnId(), status());
if (!executeAt.equals(executeAt()) || status() == Invalidated)
safeStore.agent().onInconsistentTimestamp(this, (status() == Invalidated ? Timestamp.NONE : this.executeAt()), executeAt);
if (hasBeen(Committed))
return CommitOutcome.Redundant;
}
Ranges coordinateRanges = coordinateRanges(safeStore);
// TODO (expected, consider): consider ranges between coordinateRanges and executeRanges? Perhaps don't need them
Ranges executeRanges = executeRanges(safeStore, executeAt);
ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
if (!validate(coordinateRanges, executeRanges, shard, route, Check, partialTxn, Add, partialDeps, Set))
return CommitOutcome.Insufficient;
setExecuteAt(executeAt);
set(safeStore, coordinateRanges, executeRanges, shard, route, partialTxn, Add, partialDeps, Set);
setStatus(Committed);
logger.trace("{}: committed with executeAt: {}, deps: {}", txnId(), executeAt, partialDeps);
populateWaitingOn(safeStore);
safeStore.progressLog().committed(this, shard);
// TODO (expected, safety): introduce intermediate status to avoid reentry when notifying listeners (which might notify us)
maybeExecute(safeStore, shard, true, true);
return CommitOutcome.Success;
}
// relies on mutual exclusion for each key
public void precommit(SafeCommandStore safeStore, Timestamp executeAt)
{
if (hasBeen(PreCommitted))
{
logger.trace("{}: skipping precommit - already committed ({})", txnId(), status());
if (executeAt.equals(executeAt()) && status() != Invalidated)
return;
safeStore.agent().onInconsistentTimestamp(this, (status() == Invalidated ? Timestamp.NONE : this.executeAt()), executeAt);
}
setExecuteAt(executeAt);
setStatus(PreCommitted);
notifyListeners(safeStore);
logger.trace("{}: precommitted with executeAt: {}", txnId(), executeAt);
}
protected void populateWaitingOn(SafeCommandStore safeStore)
{
Ranges ranges = safeStore.ranges().since(executeAt().epoch());
if (ranges != null)
{
partialDeps().forEach(ranges, txnId -> {
Command command = safeStore.ifLoaded(txnId);
if (command == null)
{
addWaitingOnCommit(txnId);
safeStore.addAndInvokeListener(txnId, this);
}
else
{
switch (command.status())
{
default:
throw new IllegalStateException();
case NotWitnessed:
case PreAccepted:
case Accepted:
case AcceptedInvalidate:
// we don't know when these dependencies will execute, and cannot execute until we do
command.addListener(this);
addWaitingOnCommit(command.txnId());
break;
case PreCommitted:
case Committed:
// TODO (desired, efficiency): split into ReadyToRead and ReadyToWrite;
// the distributed read can be performed as soon as those keys are ready,
// and in parallel with any other reads. the client can even ACK immediately after;
// only the write needs to be postponed until other in-progress reads complete
case ReadyToExecute:
case PreApplied:
case Applied:
command.addListener(this);
insertPredecessor(command);
case Invalidated:
break;
}
}
});
}
}
// TODO (expected, ?): commitInvalidate may need to update cfks _if_ possible
public void commitInvalidate(SafeCommandStore safeStore)
{
if (hasBeen(PreCommitted))
{
logger.trace("{}: skipping commit invalidated - already committed ({})", txnId(), status());
if (!hasBeen(Invalidated))
safeStore.agent().onInconsistentTimestamp(this, Timestamp.NONE, executeAt());
return;
}
ProgressShard shard = progressShard(safeStore);
safeStore.progressLog().invalidated(this, shard);
setExecuteAt(txnId());
if (partialDeps() == null)
setPartialDeps(PartialDeps.NONE);
setStatus(Invalidated);
logger.trace("{}: committed invalidated", txnId());
notifyListeners(safeStore);
}
public enum ApplyOutcome { Success, Redundant, Insufficient }
public ApplyOutcome apply(SafeCommandStore safeStore, long untilEpoch, Route<?> route, Timestamp executeAt, @Nullable PartialDeps partialDeps, Writes writes, Result result)
{
if (hasBeen(PreApplied) && executeAt.equals(this.executeAt()))
{
logger.trace("{}: skipping apply - already executed ({})", txnId(), status());
return ApplyOutcome.Redundant;
}
else if (hasBeen(PreCommitted) && !executeAt.equals(this.executeAt()))
{
safeStore.agent().onInconsistentTimestamp(this, this.executeAt(), executeAt);
}
Ranges coordinateRanges = coordinateRanges(safeStore);
Ranges executeRanges = executeRanges(safeStore, executeAt);
if (untilEpoch < safeStore.latestEpoch())
{
Ranges expectedRanges = safeStore.ranges().between(executeAt.epoch(), untilEpoch);
Invariants.checkState(expectedRanges.containsAll(executeRanges));
}
ProgressShard shard = progressShard(safeStore, route, coordinateRanges);
if (!validate(coordinateRanges, executeRanges, shard, route, Check, null, Check, partialDeps, hasBeen(Committed) ? Add : TrySet))
return ApplyOutcome.Insufficient; // TODO (expected, consider): this should probably be an assertion failure if !TrySet
setWrites(writes);
setResult(result);
setExecuteAt(executeAt);
set(safeStore, coordinateRanges, executeRanges, shard, route, null, Check, partialDeps, hasBeen(Committed) ? Add : TrySet);
if (!hasBeen(Committed))
populateWaitingOn(safeStore);
setStatus(PreApplied);
logger.trace("{}: apply, status set to Executed with executeAt: {}, deps: {}", txnId(), executeAt, partialDeps);
safeStore.progressLog().executed(this, shard);
maybeExecute(safeStore, shard, true, true);
return ApplyOutcome.Success;
}
@Override
public PreLoadContext listenerPreLoadContext(TxnId caller)
{
return PreLoadContext.contextFor(listOf(txnId(), caller));
}
@Override
public void onChange(SafeCommandStore safeStore, Command command)
{
logger.trace("{}: updating as listener in response to change on {} with status {} ({})",
txnId(), command.txnId(), command.status(), command);
switch (command.status())
{
default:
throw new IllegalStateException();
case NotWitnessed:
case PreAccepted:
case Accepted:
case AcceptedInvalidate:
break;
case PreCommitted:
case Committed:
case ReadyToExecute:
case PreApplied:
case Applied:
case Invalidated:
updatePredecessor(command);
maybeExecute(safeStore, progressShard(safeStore), false, true);
break;
}
}
protected void postApply(SafeCommandStore safeStore)
{
logger.trace("{} applied, setting status to Applied and notifying listeners", txnId());
setStatus(Applied);
notifyListeners(safeStore);
}
private static Function<SafeCommandStore, Void> callPostApply(TxnId txnId)
{
return safeStore -> {
safeStore.command(txnId).postApply(safeStore);
return null;
};
}
protected Future<Void> apply(SafeCommandStore safeStore)
{
// important: we can't include a reference to *this* in the lambda, since the C* implementation may evict
// the command instance from memory between now and the write completing (and post apply being called)
CommandStore unsafeStore = safeStore.commandStore();
return writes().apply(safeStore).flatMap(unused ->
unsafeStore.submit(this, callPostApply(txnId()))
);
}
public Future<Data> read(SafeCommandStore safeStore)
{
return partialTxn().read(safeStore, this);
}
// TODO (expected, API consistency): maybe split into maybeExecute and maybeApply?
private boolean maybeExecute(SafeCommandStore safeStore, ProgressShard shard, boolean alwaysNotifyListeners, boolean notifyWaitingOn)
{
if (logger.isTraceEnabled())
logger.trace("{}: Maybe executing with status {}. Will notify listeners on noop: {}", txnId(), status(), alwaysNotifyListeners);
if (status() != Committed && status() != PreApplied)
{
if (alwaysNotifyListeners)
notifyListeners(safeStore);
return false;
}
if (isWaitingOnDependency())
{
if (alwaysNotifyListeners)
notifyListeners(safeStore);
if (notifyWaitingOn)
new NotifyWaitingOn(this).accept(safeStore);
return false;
}
switch (status())
{
case Committed:
// TODO (desirable, efficiency): maintain distinct ReadyToRead and ReadyToWrite states
setStatus(ReadyToExecute);
logger.trace("{}: set to ReadyToExecute", txnId());
safeStore.progressLog().readyToExecute(this, shard);
notifyListeners(safeStore);
break;
case PreApplied:
Ranges executeRanges = executeRanges(safeStore, executeAt());
boolean intersects = writes().keys.intersects(executeRanges);
if (intersects)
{
logger.trace("{}: applying", txnId());
apply(safeStore);
}
else
{
// TODO (desirable, performance): This could be performed immediately upon Committed
// but: if we later support transitive dependency elision this could be dangerous
logger.trace("{}: applying no-op", txnId());
setStatus(Applied);
notifyListeners(safeStore);
}
}
return true;
}
/**
* @param dependency is either committed or invalidated
* @return true iff {@code maybeExecute} might now have a different outcome
*/
private boolean updatePredecessor(Command dependency)
{
Invariants.checkState(dependency.hasBeen(PreCommitted));
if (dependency.hasBeen(Invalidated))
{
logger.trace("{}: {} is invalidated. Stop listening and removing from waiting on commit set.", txnId(), dependency.txnId());
dependency.removeListener(this);
removeWaitingOnCommit(dependency.txnId());
return true;
}
else if (dependency.executeAt().compareTo(executeAt()) > 0)
{
// dependency cannot be a predecessor if it executes later
logger.trace("{}: {} executes after us. Stop listening and removing from waiting on apply set.", txnId(), dependency.txnId());
removeWaitingOn(dependency.txnId(), dependency.executeAt());
dependency.removeListener(this);
return true;
}
else if (dependency.hasBeen(Applied))
{
logger.trace("{}: {} has been applied. Stop listening and removing from waiting on apply set.", txnId(), dependency.txnId());
removeWaitingOn(dependency.txnId(), dependency.executeAt());
dependency.removeListener(this);
return true;
}
else if (isWaitingOnDependency())
{
logger.trace("{}: adding {} to waiting on apply set.", txnId(), dependency.txnId());
addWaitingOnApplyIfAbsent(dependency.txnId(), dependency.executeAt());
removeWaitingOnCommit(dependency.txnId());
return false;
}
else
{
throw new IllegalStateException();
}
}
private void insertPredecessor(Command dependency)
{
Invariants.checkState(dependency.hasBeen(PreCommitted));
if (dependency.hasBeen(Invalidated))
{
logger.trace("{}: {} is invalidated. Do not insert.", txnId(), dependency.txnId());
}
else if (dependency.executeAt().compareTo(executeAt()) > 0)
{
// dependency cannot be a predecessor if it executes later
logger.trace("{}: {} executes after us. Do not insert.", txnId(), dependency.txnId());
}
else if (dependency.hasBeen(Applied))
{
logger.trace("{}: {} has been applied. Do not insert.", txnId(), dependency.txnId());
}
else
{
logger.trace("{}: adding {} to waiting on apply set.", txnId(), dependency.txnId());
addWaitingOnApplyIfAbsent(dependency.txnId(), dependency.executeAt());
}
}
void updatePredecessorAndMaybeExecute(SafeCommandStore safeStore, Command predecessor, boolean notifyWaitingOn)
{
if (hasBeen(Applied))
return;
if (updatePredecessor(predecessor))
maybeExecute(safeStore, progressShard(safeStore), false, notifyWaitingOn);
}
static class NotifyWaitingOn implements PreLoadContext, Consumer<SafeCommandStore>
{
Known[] blockedUntil = new Known[4];
TxnId[] txnIds = new TxnId[4];
int depth;
public NotifyWaitingOn(Command command)
{
txnIds[0] = command.txnId();
blockedUntil[0] = Done;
}
@Override
public void accept(SafeCommandStore safeStore)
{
Command prev = get(safeStore, depth - 1);
while (depth >= 0)
{
Command cur = safeStore.ifLoaded(txnIds[depth]);
Known until = blockedUntil[depth];
if (cur == null)
{
// need to load; schedule execution for later
safeStore.execute(this, this);
return;
}
if (prev != null)
{
if (cur.has(until) || (cur.hasBeen(PreCommitted) && cur.executeAt().compareTo(prev.executeAt()) > 0))
{
prev.updatePredecessorAndMaybeExecute(safeStore, cur, false);
--depth;
prev = get(safeStore, depth - 1);
continue;
}
}
else if (cur.has(until))
{
// we're done; have already applied
Invariants.checkState(depth == 0);
break;
}
TxnId directlyBlockedOnCommit = cur.firstWaitingOnCommit();
TxnId directlyBlockedOnApply = cur.firstWaitingOnApply(directlyBlockedOnCommit);
if (directlyBlockedOnApply != null)
{
push(directlyBlockedOnApply, Done);
}
else if (directlyBlockedOnCommit != null)
{
push(directlyBlockedOnCommit, ExecuteAtOnly);
}
else
{
if (cur.hasBeen(Committed) && !cur.hasBeen(ReadyToExecute) && !cur.isWaitingOnDependency())
{
if (!cur.maybeExecute(safeStore, cur.progressShard(safeStore), false, false))
throw new AssertionError("Is able to Apply, but has not done so");
// loop and re-test the command's status; we may still want to notify blocking, esp. if not homeShard
continue;
}
Unseekables<?, ?> someKeys = cur.maxUnseekables();
if (someKeys == null && prev != null) someKeys = prev.partialDeps().someUnseekables(cur.txnId());
Invariants.checkState(someKeys != null);
logger.trace("{} blocked on {} until {}", txnIds[0], cur.txnId(), until);
safeStore.progressLog().waiting(cur.txnId(), until, someKeys);
return;
}
prev = cur;
}
}
private Command get(SafeCommandStore safeStore, int i)
{
return i >= 0 ? safeStore.command(txnIds[i]) : null;
}
void push(TxnId by, Known until)
{
if (++depth == txnIds.length)
{
txnIds = Arrays.copyOf(txnIds, txnIds.length * 2);
blockedUntil = Arrays.copyOf(blockedUntil, txnIds.length);
}
txnIds[depth] = by;
blockedUntil[depth] = until;
}
@Override
public Iterable<TxnId> txnIds()
{
return Arrays.asList(txnIds).subList(0, depth + 1);
}
@Override
public Seekables<?, ?> keys()
{
return Keys.EMPTY;
}
}
/**
* A key nominated to represent the "home" shard - only members of the home shard may be nominated to recover
* a transaction, to reduce the cluster-wide overhead of ensuring progress. A transaction that has only been
* witnessed at PreAccept may however trigger a process of ensuring the home shard is durably informed of
* the transaction.
*
* Note that for ProgressLog purposes the "home shard" is the shard as of txnId.epoch.
* For recovery purposes the "home shard" is as of txnId.epoch until Committed, and executeAt.epoch once Executed
*/
public final void homeKey(RoutingKey homeKey)
{
RoutingKey current = homeKey();
if (current == null) setHomeKey(homeKey);
else if (!current.equals(homeKey)) throw new AssertionError();
}
public void updateHomeKey(SafeCommandStore safeStore, RoutingKey homeKey)
{
if (homeKey() == null)
{
setHomeKey(homeKey);
// TODO (low priority, safety): if we're processed on a node that does not know the latest epoch,
// do we guarantee the home key calculation is unchanged since the prior epoch?
if (progressKey() == null && owns(safeStore, txnId().epoch(), homeKey))
progressKey(homeKey);
}
else if (!this.homeKey().equals(homeKey))
{
throw new IllegalStateException();
}
}
private ProgressShard progressShard(SafeCommandStore safeStore, Route<?> route, @Nullable RoutingKey progressKey, Ranges coordinateRanges)
{
updateHomeKey(safeStore, route.homeKey());
if (progressKey == null || progressKey == NO_PROGRESS_KEY)
{
if (this.progressKey() == null)
setProgressKey(NO_PROGRESS_KEY);
return No;
}
if (this.progressKey() == null) setProgressKey(progressKey);
else if (!this.progressKey().equals(progressKey)) throw new AssertionError();
if (!coordinateRanges.contains(progressKey))
return No;
return progressKey.equals(homeKey()) ? Home : Local;
}
/**
* A key nominated to be the primary shard within this node for managing progress of the command.
* It is nominated only as of txnId.epoch, and may be null (indicating that this node does not monitor
* the progress of this command).
*
* Preferentially, this is homeKey on nodes that replicate it, and otherwise any key that is replicated, as of txnId.epoch
*/
public final void progressKey(RoutingKey progressKey)
{
RoutingKey current = progressKey();
if (current == null) setProgressKey(progressKey);
else if (!current.equals(progressKey)) throw new AssertionError();
}
private ProgressShard progressShard(SafeCommandStore safeStore, Route<?> route, Ranges coordinateRanges)
{
if (progressKey() == null)
return Unsure;
return progressShard(safeStore, route, progressKey(), coordinateRanges);
}
private ProgressShard progressShard(SafeCommandStore safeStore)
{
RoutingKey progressKey = progressKey();
if (progressKey == null)
return Unsure;
if (progressKey == NO_PROGRESS_KEY)
return No;
Ranges coordinateRanges = safeStore.ranges().at(txnId().epoch());
if (!coordinateRanges.contains(progressKey))
return No;
return progressKey.equals(homeKey()) ? Home : Local;
}
private Ranges coordinateRanges(SafeCommandStore safeStore)
{
return safeStore.ranges().at(txnId().epoch());
}
private Ranges executeRanges(SafeCommandStore safeStore, Timestamp executeAt)
{
return safeStore.ranges().since(executeAt.epoch());
}
enum EnsureAction { Ignore, Check, Add, TrySet, Set }
/**
* Validate we have sufficient information for the route, partialTxn and partialDeps fields, and if so update them;
* otherwise return false (or throw an exception if an illegal state is encountered)
*/
private boolean validate(Ranges existingRanges, Ranges additionalRanges, ProgressShard shard,
Route<?> route, EnsureAction ensureRoute,
@Nullable PartialTxn partialTxn, EnsureAction ensurePartialTxn,
@Nullable PartialDeps partialDeps, EnsureAction ensurePartialDeps)
{
if (shard == Unsure)
return false;
// first validate route
if (shard.isHome())
{
switch (ensureRoute)
{
default: throw new AssertionError();
case Check:
if (!isFullRoute(route()) && !isFullRoute(route))
return false;
case Ignore:
break;
case Add:
case Set:
if (!isFullRoute(route))
throw new IllegalArgumentException("Incomplete route (" + route + ") sent to home shard");
break;
case TrySet:
if (!isFullRoute(route))
return false;
}
}
else
{
// failing any of these tests is always an illegal state
if (!route.covers(existingRanges))
return false;
if (existingRanges != additionalRanges && !route.covers(additionalRanges))
throw new IllegalArgumentException("Incomplete route (" + route + ") provided; does not cover " + additionalRanges);
}
// invalid to Add deps to Accepted or AcceptedInvalidate statuses, as Committed deps are not equivalent
// and we may erroneously believe we have covered a wider range than we have infact covered
if (ensurePartialDeps == Add)
Invariants.checkState(status() != Accepted && status() != AcceptedInvalidate);
// validate new partial txn
if (!validate(ensurePartialTxn, existingRanges, additionalRanges, covers(partialTxn()), covers(partialTxn), "txn", partialTxn))
return false;
if (partialTxn != null && txnId().rw() != partialTxn.kind())
throw new IllegalArgumentException("Transaction has different kind to its TxnId");
if (shard.isHome() && ensurePartialTxn != Ignore)
{
if (!hasQuery(partialTxn()) && !hasQuery(partialTxn))
throw new IllegalStateException();
}
return validate(ensurePartialDeps, existingRanges, additionalRanges, covers(partialDeps()), covers(partialDeps), "deps", partialDeps);
}
private void set(SafeCommandStore safeStore,
Ranges existingRanges, Ranges additionalRanges, ProgressShard shard, Route<?> route,
@Nullable PartialTxn partialTxn, EnsureAction ensurePartialTxn,
@Nullable PartialDeps partialDeps, EnsureAction ensurePartialDeps)
{
Invariants.checkState(progressKey() != null);
Ranges allRanges = existingRanges.with(additionalRanges);
if (shard.isProgress()) setRoute(Route.merge(route(), (Route)route));
else setRoute(route.slice(allRanges));
switch (ensurePartialTxn)
{
case Add:
if (partialTxn == null)
break;
if (partialTxn() != null)
{
partialTxn = partialTxn.slice(allRanges, shard.isHome());
Routables.foldlMissing((Seekables)partialTxn.keys(), partialTxn().keys(), (keyOrRange, p, v, i) -> {
// TODO (expected, efficiency): we may register the same ranges more than once
safeStore.register(keyOrRange, allRanges, this);
return v;
}, 0, 0, 1);
this.setPartialTxn(partialTxn().with(partialTxn));
break;
}
case Set:
case TrySet:
setPartialTxn(partialTxn = partialTxn.slice(allRanges, shard.isHome()));
// TODO (expected, efficiency): we may register the same ranges more than once
// TODO (desirable, efficiency): no need to register on PreAccept if already Accepted
safeStore.register(partialTxn.keys(), allRanges, this);
break;
}
switch (ensurePartialDeps)
{
case Add:
if (partialDeps == null)
break;
if (partialDeps() != null)
{
setPartialDeps(partialDeps().with(partialDeps.slice(allRanges)));
break;
}
case Set:
case TrySet:
setPartialDeps(partialDeps.slice(allRanges));
break;
}
}
private static boolean validate(EnsureAction action, Ranges existingRanges, Ranges additionalRanges,
Ranges existing, Ranges adding, String kind, Object obj)
{
switch (action)
{
default: throw new IllegalStateException();
case Ignore:
break;
case TrySet:
if (adding != null)
{
if (!adding.containsAll(existingRanges))
return false;
if (additionalRanges != existingRanges && !adding.containsAll(additionalRanges))
return false;
break;
}
case Set:
// failing any of these tests is always an illegal state
Invariants.checkState(adding != null);
if (!adding.containsAll(existingRanges))
throw new IllegalArgumentException("Incomplete " + kind + " (" + obj + ") provided; does not cover " + existingRanges);
if (additionalRanges != existingRanges && !adding.containsAll(additionalRanges))
throw new IllegalArgumentException("Incomplete " + kind + " (" + obj + ") provided; does not cover " + additionalRanges);
break;
case Check:
case Add:
if (adding == null)
{
if (existing == null)
return false;
Invariants.checkState(existing.containsAll(existingRanges));
if (existingRanges != additionalRanges && !existing.containsAll(additionalRanges))
{
if (action == Check)
return false;
throw new IllegalArgumentException("Missing additional " + kind + "; existing does not cover " + additionalRanges.difference(existingRanges));
}
}
else if (existing != null)
{
Ranges covering = adding.with(existing);
Invariants.checkState(covering.containsAll(existingRanges));
if (existingRanges != additionalRanges && !covering.containsAll(additionalRanges))
{
if (action == Check)
return false;
throw new IllegalArgumentException("Incomplete additional " + kind + " (" + obj + ") provided; does not cover " + additionalRanges.difference(existingRanges));
}
}
else
{
if (!adding.containsAll(existingRanges))
return false;
if (existingRanges != additionalRanges && !adding.containsAll(additionalRanges))
{
if (action == Check)
return false;
throw new IllegalArgumentException("Incomplete additional " + kind + " (" + obj + ") provided; does not cover " + additionalRanges.difference(existingRanges));
}
}
break;
}
return true;
}
// TODO (low priority, progress): callers should try to consult the local progress shard (if any) to obtain the full set of keys owned locally
public Route<?> someRoute()
{
if (route() != null)
return route();
if (homeKey() != null)
return PartialRoute.empty(txnId().domain(), homeKey());
return null;
}
public Unseekables<?, ?> maxUnseekables()
{
Route<?> route = someRoute();
if (route == null)
return null;
return route.toMaximalUnseekables();
}
/**
* true iff this commandStore owns the given key on the given epoch
*/
public boolean owns(SafeCommandStore safeStore, long epoch, RoutingKey someKey)
{
return safeStore.ranges().at(epoch).contains(someKey);
}
@Override
public void accept(SafeCommandStore safeStore, CommandListener listener)
{
listener.onChange(safeStore, this);
}
@Override
public String toString()
{
return "Command{" +
"txnId=" + txnId() +
", status=" + status() +
", partialTxn=" + partialTxn() +
", executeAt=" + executeAt() +
", partialDeps=" + partialDeps() +
'}';
}
private static Ranges covers(@Nullable PartialTxn txn)
{
return txn == null ? null : txn.covering();
}
private static Ranges covers(@Nullable PartialDeps deps)
{
return deps == null ? null : deps.covering;
}
private static boolean hasQuery(PartialTxn txn)
{
return txn != null && txn.query() != null;
}
// TODO (low priority, API): this is an ugly hack, need to encode progress/homeKey/Route state combinations much more clearly
// (perhaps introduce encapsulating class representing each possible arrangement)
static class NoProgressKey implements RoutingKey
{
@Override
public int compareTo(@Nonnull RoutableKey that)
{
throw new UnsupportedOperationException();
}
@Override
public Range asRange()
{
throw new UnsupportedOperationException();
}
}
private static final NoProgressKey NO_PROGRESS_KEY = new NoProgressKey();
}