blob: cfde8710ad6357869384d5e13d0819cbf15a4ae7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import accord.coordinate.*;
import accord.local.*;
import accord.local.Status.Known;
import accord.primitives.*;
import accord.utils.Invariants;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
import accord.local.Node.Id;
import accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus;
import accord.messages.Callback;
import accord.messages.InformDurable;
import accord.messages.SimpleReply;
import accord.topology.Topologies;
import org.apache.cassandra.utils.concurrent.Future;
import static accord.api.ProgressLog.ProgressShard.Home;
import static accord.api.ProgressLog.ProgressShard.Unsure;
import static accord.coordinate.InformHomeOfTxn.inform;
import static accord.impl.SimpleProgressLog.DisseminateState.DisseminateStatus.NotExecuted;
import static accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus.NotWitnessed;
import static accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus.ReadyToExecute;
import static accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus.Uncommitted;
import static accord.impl.SimpleProgressLog.NonHomeState.Safe;
import static accord.impl.SimpleProgressLog.NonHomeState.StillUnsafe;
import static accord.impl.SimpleProgressLog.NonHomeState.Unsafe;
import static accord.impl.SimpleProgressLog.Progress.Done;
import static accord.impl.SimpleProgressLog.Progress.Expected;
import static accord.impl.SimpleProgressLog.Progress.Investigating;
import static accord.impl.SimpleProgressLog.Progress.NoProgress;
import static accord.impl.SimpleProgressLog.Progress.NoneExpected;
import static accord.impl.SimpleProgressLog.Progress.advance;
import static accord.local.PreLoadContext.contextFor;
import static accord.local.Status.Durability.Durable;
import static accord.local.Status.Known.Nothing;
import static accord.local.Status.PreApplied;
import static accord.local.Status.PreCommitted;
import static accord.primitives.Route.isFullRoute;
// TODO: consider propagating invalidations in the same way as we do applied
public class SimpleProgressLog implements Runnable, ProgressLog.Factory
{
enum Progress
{
NoneExpected, Expected, NoProgress, Investigating, Done;
static Progress advance(Progress current)
{
switch (current)
{
default: throw new IllegalStateException();
case NoneExpected:
case Investigating:
case Done:
return current;
case Expected:
case NoProgress:
return NoProgress;
}
}
}
// exists only on home shard
static class CoordinateState
{
enum CoordinateStatus
{
NotWitnessed, Uncommitted, Committed, ReadyToExecute, Done;
boolean isAtMost(CoordinateStatus equalOrLessThan)
{
return compareTo(equalOrLessThan) <= 0;
}
boolean isAtLeast(CoordinateStatus equalOrGreaterThan)
{
return compareTo(equalOrGreaterThan) >= 0;
}
}
CoordinateStatus status = NotWitnessed;
Progress progress = NoneExpected;
ProgressToken token = ProgressToken.NONE;
Object debugInvestigating;
void ensureAtLeast(Command command, CoordinateStatus newStatus, Progress newProgress)
{
ensureAtLeast(newStatus, newProgress);
updateMax(command);
}
void ensureAtLeast(CoordinateStatus newStatus, Progress newProgress)
{
if (newStatus.compareTo(status) > 0)
{
status = newStatus;
progress = newProgress;
}
}
void updateMax(Command command)
{
token = token.merge(new ProgressToken(command.durability(), command.status(), command.promised(), command.accepted()));
}
void updateMax(ProgressToken ok)
{
// TODO: perhaps set localProgress back to Waiting if Investigating and we update anything?
token = token.merge(ok);
}
void durableGlobal()
{
switch (status)
{
default: throw new IllegalStateException();
case NotWitnessed:
case Uncommitted:
case Committed:
case ReadyToExecute:
status = CoordinateStatus.Done;
progress = NoneExpected;
case Done:
}
}
void update(Node node, CommandStore commandStore, TxnId txnId, Command command)
{
if (progress != NoProgress)
{
progress = advance(progress);
return;
}
progress = Investigating;
switch (status)
{
default: throw new AssertionError();
case NotWitnessed: // can't make progress if we haven't witnessed it yet
case Committed: // can't make progress if we aren't yet ReadyToExecute
case Done: // shouldn't be trying to make progress, as we're done
throw new IllegalStateException();
case Uncommitted:
case ReadyToExecute:
{
if (status.isAtLeast(CoordinateStatus.Committed) && command.durability().isDurable())
{
// must also be committed, as at the time of writing we do not guarantee dissemination of Commit
// records to the home shard, so we only know the executeAt shards will have witnessed this
// if the home shard is at an earlier phase, it must run recovery
long epoch = command.executeAt().epoch;
node.withEpoch(epoch, () -> debugInvestigating = FetchData.fetch(PreApplied.minKnown, node, txnId, command.route(), epoch, (success, fail) -> {
// should have found enough information to apply the result, but in case we did not reset progress
if (progress == Investigating)
progress = Expected;
}));
}
else
{
RoutingKey homeKey = command.homeKey();
node.withEpoch(txnId.epoch, () -> {
Future<? extends Outcome> recover = node.maybeRecover(txnId, homeKey, command.route(), token);
recover.addCallback((success, fail) -> {
if (status.isAtMost(ReadyToExecute) && progress == Investigating)
{
progress = Expected;
if (fail != null)
return;
ProgressToken token = success.asProgressToken();
// TODO: avoid returning null (need to change semantics here in this case, though, as Recover doesn't return CheckStatusOk)
if (token.durability.isDurable())
{
commandStore.execute(contextFor(txnId), safeStore -> {
Command cmd = safeStore.command(txnId);
cmd.setDurability(safeStore, token.durability, homeKey, null);
safeStore.progressLog().durable(txnId, cmd.maxUnseekables(), null);
}).addCallback(commandStore.agent());
}
updateMax(token);
}
});
debugInvestigating = recover;
});
}
}
}
}
@Override
public String toString()
{
return "{" + status + ',' + progress + '}';
}
}
// exists only on home shard
static class DisseminateState
{
enum DisseminateStatus { NotExecuted, Durable, Done }
// TODO: thread safety (schedule on progress log executor)
class CoordinateAwareness implements Callback<SimpleReply>
{
@Override
public void onSuccess(Id from, SimpleReply reply)
{
notAwareOfDurability.remove(from);
maybeDone();
}
@Override
public void onFailure(Id from, Throwable failure)
{
}
@Override
public void onCallbackFailure(Id from, Throwable failure)
{
}
}
DisseminateStatus status = NotExecuted;
Progress progress = NoneExpected;
Set<Id> notAwareOfDurability;
Set<Id> notPersisted;
List<Runnable> whenReady;
CoordinateAwareness investigating;
private void whenReady(Node node, Command command, Runnable runnable)
{
if (notAwareOfDurability != null || maybeReady(node, command))
{
runnable.run();
}
else
{
if (whenReady == null)
whenReady = new ArrayList<>();
whenReady.add(runnable);
}
}
private void whenReady(Runnable runnable)
{
if (notAwareOfDurability != null)
{
runnable.run();
}
else
{
if (whenReady == null)
whenReady = new ArrayList<>();
whenReady.add(runnable);
}
}
// must know the epoch information, and have a valid Route
private boolean maybeReady(Node node, Command command)
{
if (!command.status().hasBeen(Status.PreCommitted))
return false;
if (!isFullRoute(command.route()))
return false;
if (!node.topology().hasEpoch(command.executeAt().epoch))
return false;
Topologies topology = node.topology().preciseEpochs(command.route(), command.txnId().epoch, command.executeAt().epoch);
notAwareOfDurability = topology.copyOfNodes();
notPersisted = topology.copyOfNodes();
if (whenReady != null)
{
whenReady.forEach(Runnable::run);
whenReady = null;
}
return true;
}
private void maybeDone()
{
if (notAwareOfDurability.isEmpty())
{
status = DisseminateStatus.Done;
progress = Done;
}
}
void durableGlobal(Node node, Command command, @Nullable Set<Id> persistedOn)
{
if (status == DisseminateStatus.Done)
return;
status = DisseminateStatus.Durable;
progress = Expected;
if (persistedOn == null)
return;
whenReady(node, command, () -> {
notPersisted.removeAll(persistedOn);
notAwareOfDurability.removeAll(persistedOn);
maybeDone();
});
}
void durableLocal(Node node)
{
if (status == DisseminateStatus.Done)
return;
status = DisseminateStatus.Durable;
progress = Expected;
whenReady(() -> {
notPersisted.remove(node.id());
notAwareOfDurability.remove(node.id());
maybeDone();
});
}
void update(Node node, TxnId txnId, Command command)
{
switch (status)
{
default: throw new IllegalStateException();
case NotExecuted:
case Done:
return;
case Durable:
}
if (notAwareOfDurability == null && !maybeReady(node, command))
return;
if (progress != NoProgress)
{
progress = advance(progress);
return;
}
progress = Investigating;
if (notAwareOfDurability.isEmpty())
{
// TODO: also track actual durability
status = DisseminateStatus.Done;
progress = Done;
return;
}
FullRoute<?> route = Route.castToFullRoute(command.route());
Timestamp executeAt = command.executeAt();
investigating = new CoordinateAwareness();
Topologies topologies = node.topology().preciseEpochs(route, txnId.epoch, executeAt.epoch);
node.send(notAwareOfDurability, to -> new InformDurable(to, topologies, route, txnId, executeAt, Durable), investigating);
}
@Override
public String toString()
{
return "{" + status + ',' + progress + '}';
}
}
static class BlockingState
{
Known blockedUntil = Nothing;
Progress progress = NoneExpected;
Unseekables<?, ?> blockedOn;
Object debugInvestigating;
void recordBlocking(Known blockedUntil, Unseekables<?, ?> blockedOn)
{
Invariants.checkState(!blockedOn.isEmpty());
if (this.blockedOn == null) this.blockedOn = blockedOn;
else this.blockedOn = Unseekables.merge(this.blockedOn, (Unseekables)blockedOn);
if (!blockedUntil.isSatisfiedBy(this.blockedUntil))
{
this.blockedUntil = this.blockedUntil.merge(blockedUntil);
progress = Expected;
}
}
void record(Known known)
{
if (blockedUntil.isSatisfiedBy(known))
progress = NoneExpected;
}
void update(Node node, TxnId txnId, Command command)
{
if (progress != NoProgress)
{
progress = advance(progress);
return;
}
if (command.has(blockedUntil))
{
progress = NoneExpected;
return;
}
progress = Investigating;
// first make sure we have enough information to obtain the command locally
Timestamp executeAt = command.hasBeen(PreCommitted) ? command.executeAt() : null;
long srcEpoch = (executeAt != null ? executeAt : txnId).epoch;
// TODO: compute fromEpoch, the epoch we already have this txn replicated until
long toEpoch = Math.max(srcEpoch, node.topology().epoch());
Unseekables<?, ?> someKeys = unseekables(command);
BiConsumer<Known, Throwable> callback = (success, fail) -> {
if (progress != Investigating)
return;
progress = Expected;
if (fail == null)
{
if (!success.isDefinitionKnown()) invalidate(node, txnId, someKeys);
else record(success);
}
};
node.withEpoch(toEpoch, () -> {
debugInvestigating = FetchData.fetch(blockedUntil, node, txnId, someKeys, executeAt, toEpoch, callback);
});
}
private Unseekables<?, ?> unseekables(Command command)
{
return Unseekables.merge((Route)command.route(), blockedOn);
}
private void invalidate(Node node, TxnId txnId, Unseekables<?, ?> someKeys)
{
progress = Investigating;
// TODO (RangeTxns): This should be a Routable, or we should guarantee it is safe to operate on any key in the range
RoutingKey someKey = Route.isRoute(someKeys) ? (Route.castToRoute(someKeys)).homeKey() : someKeys.get(0).someIntersectingRoutingKey();
someKeys = someKeys.with(someKey);
debugInvestigating = Invalidate.invalidate(node, txnId, someKeys, (success, fail) -> {
if (progress != Investigating)
return;
progress = Expected;
if (fail == null && success.asProgressToken().durability.isDurable())
progress = Done;
});
}
public String toString()
{
return progress.toString();
}
}
enum NonHomeState
{
Unsafe, StillUnsafe, Investigating, Safe
}
static class State
{
final TxnId txnId;
final CommandStore commandStore;
CoordinateState coordinateState;
DisseminateState disseminateState;
NonHomeState nonHomeState;
BlockingState blockingState;
State(TxnId txnId, CommandStore commandStore)
{
this.txnId = txnId;
this.commandStore = commandStore;
}
void recordBlocking(TxnId txnId, Known waitingFor, Unseekables<?, ?> unseekables)
{
Invariants.checkArgument(txnId.equals(this.txnId));
if (blockingState == null)
blockingState = new BlockingState();
blockingState.recordBlocking(waitingFor, unseekables);
}
void ensureAtLeast(NonHomeState ensureAtLeast)
{
if (nonHomeState == null || nonHomeState.compareTo(ensureAtLeast) < 0)
nonHomeState = ensureAtLeast;
}
CoordinateState local()
{
if (coordinateState == null)
coordinateState = new CoordinateState();
return coordinateState;
}
DisseminateState global()
{
if (disseminateState == null)
disseminateState = new DisseminateState();
return disseminateState;
}
void ensureAtLeast(Command command, CoordinateStatus newStatus, Progress newProgress)
{
local().ensureAtLeast(command, newStatus, newProgress);
}
void ensureAtLeast(TxnId txnId, RoutingKey homeKey, CoordinateStatus newStatus, Progress newProgress)
{
local().ensureAtLeast(newStatus, newProgress);
}
void updateNonHome(Node node, Command command)
{
switch (nonHomeState)
{
default: throw new IllegalStateException();
case Safe:
case Investigating:
break;
case Unsafe:
nonHomeState = StillUnsafe;
break;
case StillUnsafe:
// make sure a quorum of the home shard is aware of the transaction, so we can rely on it to ensure progress
Future<Void> inform = inform(node, txnId, command.homeKey());
inform.addCallback((success, fail) -> {
if (nonHomeState == Safe)
return;
if (fail != null) nonHomeState = Unsafe;
else nonHomeState = Safe;
});
break;
}
}
void update(Node node)
{
PreLoadContext context = contextFor(txnId);
commandStore.execute(context, safeStore -> {
Command command = safeStore.command(txnId);
if (blockingState != null)
blockingState.update(node, txnId, command);
if (coordinateState != null)
coordinateState.update(node, safeStore.commandStore(), txnId, command);
if (disseminateState != null)
disseminateState.update(node, txnId, command);
if (nonHomeState != null)
updateNonHome(node, command);
}).addCallback(commandStore.agent());
}
@Override
public String toString()
{
return coordinateState != null ? coordinateState.toString()
: nonHomeState != null
? nonHomeState.toString()
: blockingState.toString();
}
}
final Node node;
final List<Instance> instances = new CopyOnWriteArrayList<>();
public SimpleProgressLog(Node node)
{
this.node = node;
node.scheduler().recurring(this, 200L, TimeUnit.MILLISECONDS);
}
class Instance implements ProgressLog
{
final CommandStore commandStore;
final Map<TxnId, State> stateMap = new HashMap<>();
Instance(CommandStore commandStore)
{
this.commandStore = commandStore;
instances.add(this);
}
State ensure(TxnId txnId)
{
return stateMap.computeIfAbsent(txnId, id -> new State(id, commandStore));
}
State ensure(TxnId txnId, State state)
{
return state != null ? state : ensure(txnId);
}
@Override
public void unwitnessed(TxnId txnId, RoutingKey homeKey, ProgressShard shard)
{
if (shard.isHome())
ensure(txnId).ensureAtLeast(txnId, homeKey, Uncommitted, Expected);
}
@Override
public void preaccepted(Command command, ProgressShard shard)
{
Invariants.checkState(shard != Unsure);
if (shard.isProgress())
{
State state = ensure(command.txnId());
if (shard.isHome()) state.ensureAtLeast(command, Uncommitted, Expected);
else state.ensureAtLeast(NonHomeState.Unsafe);
}
}
State recordCommit(TxnId txnId)
{
State state = stateMap.get(txnId);
if (state != null && state.blockingState != null)
state.blockingState.record(SaveStatus.Committed.known);
return state;
}
State recordApply(TxnId txnId)
{
State state = stateMap.get(txnId);
if (state != null && state.blockingState != null)
state.blockingState.record(SaveStatus.PreApplied.known);
return state;
}
private void ensureSafeOrAtLeast(Command command, ProgressShard shard, CoordinateStatus newStatus, Progress newProgress)
{
Invariants.checkState(shard != Unsure);
State state = null;
assert newStatus.isAtMost(ReadyToExecute);
if (newStatus.isAtLeast(CoordinateStatus.Committed))
state = recordCommit(command.txnId());
if (shard.isProgress())
{
state = ensure(command.txnId(), state);
if (shard.isHome()) state.ensureAtLeast(command, newStatus, newProgress);
else ensure(command.txnId()).ensureAtLeast(Safe);
}
}
@Override
public void accepted(Command command, ProgressShard shard)
{
ensureSafeOrAtLeast(command, shard, Uncommitted, Expected);
}
@Override
public void committed(Command command, ProgressShard shard)
{
ensureSafeOrAtLeast(command, shard, CoordinateStatus.Committed, NoneExpected);
}
@Override
public void readyToExecute(Command command, ProgressShard shard)
{
ensureSafeOrAtLeast(command, shard, CoordinateStatus.ReadyToExecute, Expected);
}
@Override
public void executed(Command command, ProgressShard shard)
{
recordApply(command.txnId());
// this is the home shard's state ONLY, so we don't know it is fully durable locally
ensureSafeOrAtLeast(command, shard, CoordinateStatus.ReadyToExecute, Expected);
}
@Override
public void invalidated(Command command, ProgressShard shard)
{
State state = recordApply(command.txnId());
Invariants.checkState(shard == Home || state == null || state.coordinateState == null);
// note: we permit Unsure here, so we check if we have any local home state
if (shard.isProgress())
{
state = ensure(command.txnId(), state);
if (shard.isHome()) state.ensureAtLeast(command, CoordinateStatus.Done, Done);
else ensure(command.txnId()).ensureAtLeast(Safe);
}
}
@Override
public void durableLocal(TxnId txnId)
{
State state = ensure(txnId);
state.global().durableLocal(node);
}
@Override
public void durable(Command command, @Nullable Set<Id> persistedOn)
{
State state = ensure(command.txnId());
if (!command.status().hasBeen(PreApplied))
state.recordBlocking(command.txnId(), PreApplied.minKnown, command.maxUnseekables());
state.local().durableGlobal();
state.global().durableGlobal(node, command, persistedOn);
}
@Override
public void durable(TxnId txnId, Unseekables<?, ?> unseekables, ProgressShard shard)
{
State state = ensure(txnId);
// TODO: we can probably simplify things by requiring (empty) Apply messages to be sent also to the coordinating topology
state.recordBlocking(txnId, PreApplied.minKnown, unseekables);
}
public void waiting(TxnId blockedBy, Known blockedUntil, Unseekables<?, ?> blockedOn)
{
// TODO (soon): forward to progress shard for processing (if known)
// TODO (soon): if we are co-located with the home shard, don't need to do anything unless we're in a
// later topology that wasn't covered by its coordination
ensure(blockedBy).recordBlocking(blockedBy, blockedUntil, blockedOn);
}
}
@Override
public void run()
{
for (Instance instance : instances)
{
// TODO: we want to be able to poll others about pending dependencies to check forward progress,
// as we don't know all dependencies locally (or perhaps any, at execution time) so we may
// begin expecting forward progress too early
new ArrayList<>(instance.stateMap.values()).forEach(state -> {
try
{
state.update(node);
}
catch (Throwable t)
{
node.agent().onUncaughtException(t);
}
});
}
}
@Override
public ProgressLog create(CommandStore commandStore)
{
return new Instance(commandStore);
}
}