blob: c6069dc4850e425be4b0c8a4919327b518c4dbdb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import accord.utils.IntrusiveLinkedList;
import accord.utils.IntrusiveLinkedListNode;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
import accord.coordinate.*;
import accord.local.*;
import accord.local.Node.Id;
import accord.local.Status.Known;
import accord.messages.Callback;
import accord.messages.InformDurable;
import accord.messages.SimpleReply;
import accord.primitives.*;
import accord.topology.Topologies;
import accord.utils.Invariants;
import org.apache.cassandra.utils.concurrent.Future;
import static accord.api.ProgressLog.ProgressShard.Home;
import static accord.api.ProgressLog.ProgressShard.Unsure;
import static accord.coordinate.InformHomeOfTxn.inform;
import static accord.impl.SimpleProgressLog.CoordinateStatus.ReadyToExecute;
import static accord.impl.SimpleProgressLog.CoordinateStatus.Uncommitted;
import static accord.impl.SimpleProgressLog.DisseminateStatus.NotExecuted;
import static accord.impl.SimpleProgressLog.Progress.Done;
import static accord.impl.SimpleProgressLog.Progress.Expected;
import static accord.impl.SimpleProgressLog.Progress.Investigating;
import static accord.impl.SimpleProgressLog.Progress.NoProgress;
import static accord.impl.SimpleProgressLog.Progress.NoneExpected;
import static accord.local.PreLoadContext.contextFor;
import static accord.local.Status.Durability.Durable;
import static accord.local.Status.Known.Nothing;
import static accord.local.Status.PreApplied;
import static accord.local.Status.PreCommitted;
import static accord.primitives.Route.isFullRoute;
// TODO: consider propagating invalidations in the same way as we do applied
public class SimpleProgressLog implements ProgressLog.Factory
{
enum Progress { NoneExpected, Expected, NoProgress, Investigating, Done }
enum CoordinateStatus
{
NotWitnessed, Uncommitted, Committed, ReadyToExecute, Done;
boolean isAtMostReadyToExecute()
{
return compareTo(CoordinateStatus.ReadyToExecute) <= 0;
}
boolean isAtLeastCommitted()
{
return compareTo(CoordinateStatus.Committed) >= 0;
}
}
enum DisseminateStatus { NotExecuted, Durable, Done }
final Node node;
final List<Instance> instances = new CopyOnWriteArrayList<>();
public SimpleProgressLog(Node node)
{
this.node = node;
}
class Instance extends IntrusiveLinkedList<Instance.State.Monitoring> implements ProgressLog, Runnable
{
class State
{
abstract class Monitoring extends IntrusiveLinkedListNode
{
private Progress progress = NoneExpected;
void setProgress(Progress newProgress)
{
Invariants.checkState(progress != Done);
progress = newProgress;
switch (newProgress)
{
default: throw new AssertionError();
case NoneExpected:
case Done:
case Investigating:
remove();
Invariants.paranoid(isFree());
break;
case Expected:
case NoProgress:
if (isFree())
addFirst(this);
}
}
boolean shouldRun()
{
switch (progress)
{
default: throw new AssertionError();
case NoneExpected:
case Done:
case Investigating:
throw new IllegalStateException();
case Expected:
Invariants.paranoid(!isFree());
progress = NoProgress;
return false;
case NoProgress:
remove();
return true;
}
}
abstract void run(Command command);
Progress progress()
{
return progress;
}
TxnId txnId()
{
return txnId;
}
}
// exists only on home shard
class CoordinateState extends Monitoring
{
CoordinateStatus status = CoordinateStatus.NotWitnessed;
ProgressToken token = ProgressToken.NONE;
Object debugInvestigating;
void ensureAtLeast(Command command, CoordinateStatus newStatus, Progress newProgress)
{
ensureAtLeast(newStatus, newProgress);
updateMax(command);
}
void ensureAtLeast(CoordinateStatus newStatus, Progress newProgress)
{
if (newStatus.compareTo(status) > 0)
{
status = newStatus;
setProgress(newProgress);
}
}
void updateMax(Command command)
{
token = token.merge(new ProgressToken(command.durability(), command.status(), command.promised(), command.accepted()));
}
void updateMax(ProgressToken ok)
{
// TODO: perhaps set localProgress back to Waiting if Investigating and we update anything?
token = token.merge(ok);
}
void durableGlobal()
{
switch (status)
{
default: throw new IllegalStateException();
case NotWitnessed:
case Uncommitted:
case Committed:
case ReadyToExecute:
status = CoordinateStatus.Done;
setProgress(Done);
case Done:
}
}
@Override
void run(Command command)
{
setProgress(Investigating);
switch (status)
{
default: throw new AssertionError();
case NotWitnessed: // can't make progress if we haven't witnessed it yet
case Committed: // can't make progress if we aren't yet ReadyToExecute
case Done: // shouldn't be trying to make progress, as we're done
throw new IllegalStateException();
case Uncommitted:
case ReadyToExecute:
{
if (status.isAtLeastCommitted() && command.durability().isDurable())
{
// must also be committed, as at the time of writing we do not guarantee dissemination of Commit
// records to the home shard, so we only know the executeAt shards will have witnessed this
// if the home shard is at an earlier phase, it must run recovery
long epoch = command.executeAt().epoch;
node.withEpoch(epoch, () -> debugInvestigating = FetchData.fetch(PreApplied.minKnown, node, txnId, command.route(), epoch, (success, fail) -> {
commandStore.execute(PreLoadContext.empty(), ignore -> {
// should have found enough information to apply the result, but in case we did not reset progress
if (progress() == Investigating)
setProgress(Expected);
});
}));
}
else
{
RoutingKey homeKey = command.homeKey();
node.withEpoch(txnId.epoch, () -> {
Future<? extends Outcome> recover = node.maybeRecover(txnId, homeKey, command.route(), token);
recover.addCallback((success, fail) -> {
commandStore.execute(PreLoadContext.empty(), ignore -> {
if (status.isAtMostReadyToExecute() && progress() == Investigating)
{
setProgress(Expected);
if (fail != null)
return;
ProgressToken token = success.asProgressToken();
// TODO: avoid returning null (need to change semantics here in this case, though, as Recover doesn't return CheckStatusOk)
if (token.durability.isDurable())
{
commandStore.execute(contextFor(txnId), safeStore -> {
Command cmd = safeStore.command(txnId);
cmd.setDurability(safeStore, token.durability, homeKey, null);
safeStore.progressLog().durable(txnId, cmd.maxUnseekables(), null);
}).addCallback(commandStore.agent());
}
updateMax(token);
}
});
});
debugInvestigating = recover;
});
}
}
}
}
@Override
public String toString()
{
return "{" + status + ',' + progress() + '}';
}
}
// exists only on home shard
class DisseminateState extends State.Monitoring
{
class CoordinateAwareness implements Callback<SimpleReply>
{
@Override
public void onSuccess(Id from, SimpleReply reply)
{
// TODO: callbacks should be associated with a commandStore for processing to avoid this
commandStore.execute(PreLoadContext.empty(), ignore -> {
if (progress() == Done)
return;
notAwareOfDurability.remove(from);
maybeDone();
});
}
@Override
public void onFailure(Id from, Throwable failure)
{
}
@Override
public void onCallbackFailure(Id from, Throwable failure)
{
}
}
DisseminateStatus status = NotExecuted;
Set<Id> notAwareOfDurability; // TODO: use Agrona's IntHashSet as soon as Node.Id switches from long to int
Set<Id> notPersisted; // TODO: use Agrona's IntHashSet as soon as Node.Id switches from long to int
List<Runnable> whenReady;
CoordinateAwareness investigating;
private void whenReady(Node node, Command command, Runnable runnable)
{
if (notAwareOfDurability != null || maybeReady(node, command))
{
runnable.run();
}
else
{
if (whenReady == null)
whenReady = new ArrayList<>();
whenReady.add(runnable);
}
}
private void whenReady(Runnable runnable)
{
if (notAwareOfDurability != null)
{
runnable.run();
}
else
{
if (whenReady == null)
whenReady = new ArrayList<>();
whenReady.add(runnable);
}
}
// must know the epoch information, and have a valid Route
private boolean maybeReady(Node node, Command command)
{
if (!command.status().hasBeen(Status.PreCommitted))
return false;
if (!isFullRoute(command.route()))
return false;
if (!node.topology().hasEpoch(command.executeAt().epoch))
return false;
Topologies topology = node.topology().preciseEpochs(command.route(), command.txnId().epoch, command.executeAt().epoch);
notAwareOfDurability = topology.copyOfNodes();
notPersisted = topology.copyOfNodes();
if (whenReady != null)
{
whenReady.forEach(Runnable::run);
whenReady = null;
}
return true;
}
private void maybeDone()
{
if (progress() != Done && notAwareOfDurability.isEmpty())
{
status = DisseminateStatus.Done;
setProgress(Done);
}
}
void durableGlobal(Node node, Command command, @Nullable Set<Id> persistedOn)
{
if (status == DisseminateStatus.Done)
return;
status = DisseminateStatus.Durable;
setProgress(Expected);
if (persistedOn == null)
return;
whenReady(node, command, () -> {
notPersisted.removeAll(persistedOn);
notAwareOfDurability.removeAll(persistedOn);
maybeDone();
});
}
void durableLocal(Node node)
{
if (status == DisseminateStatus.Done)
return;
status = DisseminateStatus.Durable;
setProgress(Expected);
whenReady(() -> {
notPersisted.remove(node.id());
notAwareOfDurability.remove(node.id());
maybeDone();
});
}
@Override
void run(Command command)
{
switch (status)
{
default: throw new IllegalStateException();
case NotExecuted:
case Done:
return;
case Durable:
}
if (notAwareOfDurability == null && !maybeReady(node, command))
return;
setProgress(Investigating);
if (notAwareOfDurability.isEmpty())
{
// TODO: also track actual durability
status = DisseminateStatus.Done;
setProgress(Done);
return;
}
FullRoute<?> route = Route.castToFullRoute(command.route());
Timestamp executeAt = command.executeAt();
investigating = new CoordinateAwareness();
Topologies topologies = node.topology().preciseEpochs(route, txnId.epoch, executeAt.epoch);
node.send(notAwareOfDurability, to -> new InformDurable(to, topologies, route, txnId, executeAt, Durable), investigating);
}
@Override
public String toString()
{
return "{" + status + ',' + progress() + '}';
}
}
class BlockingState extends State.Monitoring
{
Known blockedUntil = Nothing;
Unseekables<?, ?> blockedOn;
Object debugInvestigating;
void recordBlocking(Known blockedUntil, Unseekables<?, ?> blockedOn)
{
Invariants.checkState(!blockedOn.isEmpty());
if (this.blockedOn == null) this.blockedOn = blockedOn;
else this.blockedOn = Unseekables.merge(this.blockedOn, (Unseekables)blockedOn);
if (!blockedUntil.isSatisfiedBy(this.blockedUntil))
{
this.blockedUntil = this.blockedUntil.merge(blockedUntil);
setProgress(Expected);
}
}
void record(Known known)
{
if (blockedUntil.isSatisfiedBy(known))
setProgress(NoneExpected);
}
@Override
void run(Command command)
{
if (command.has(blockedUntil))
{
setProgress(NoneExpected);
return;
}
setProgress(Investigating);
// first make sure we have enough information to obtain the command locally
Timestamp executeAt = command.hasBeen(PreCommitted) ? command.executeAt() : null;
long srcEpoch = (executeAt != null ? executeAt : txnId).epoch;
// TODO: compute fromEpoch, the epoch we already have this txn replicated until
long toEpoch = Math.max(srcEpoch, node.topology().epoch());
Unseekables<?, ?> someKeys = unseekables(command);
BiConsumer<Known, Throwable> callback = (success, fail) -> {
commandStore.execute(PreLoadContext.empty(), ignore -> {
if (progress() != Investigating)
return;
setProgress(Expected);
if (fail == null)
{
if (!success.isDefinitionKnown()) invalidate(node, txnId, someKeys);
else record(success);
}
});
};
node.withEpoch(toEpoch, () -> {
debugInvestigating = FetchData.fetch(blockedUntil, node, txnId, someKeys, executeAt, toEpoch, callback);
});
}
private Unseekables<?, ?> unseekables(Command command)
{
return Unseekables.merge((Route)command.route(), blockedOn);
}
private void invalidate(Node node, TxnId txnId, Unseekables<?, ?> someKeys)
{
setProgress(Investigating);
// TODO (RangeTxns): This should be a Routable, or we should guarantee it is safe to operate on any key in the range
RoutingKey someKey = Route.isRoute(someKeys) ? (Route.castToRoute(someKeys)).homeKey() : someKeys.get(0).someIntersectingRoutingKey();
someKeys = someKeys.with(someKey);
debugInvestigating = Invalidate.invalidate(node, txnId, someKeys, (success, fail) -> {
commandStore.execute(PreLoadContext.empty(), ignore -> {
if (progress() != Investigating)
return;
setProgress(Expected);
if (fail == null && success.asProgressToken().durability.isDurable())
setProgress(Done);
});
});
}
@Override
public String toString()
{
return progress().toString();
}
}
class NonHomeState extends State.Monitoring
{
NonHomeState()
{
setProgress(Expected);
}
void setSafe()
{
if (progress() != Done)
setProgress(Done);
}
@Override
void run(Command command)
{
// make sure a quorum of the home shard is aware of the transaction, so we can rely on it to ensure progress
Future<Void> inform = inform(node, txnId, command.homeKey());
inform.addCallback((success, fail) -> {
commandStore.execute(PreLoadContext.empty(), ignore -> {
if (progress() == Done)
return;
setProgress(fail != null ? Expected : Done);
});
});
}
@Override
public String toString()
{
return progress() == Done ? "Safe" : "Unsafe";
}
}
final TxnId txnId;
CoordinateState coordinateState;
DisseminateState disseminateState;
NonHomeState nonHomeState;
BlockingState blockingState;
State(TxnId txnId)
{
this.txnId = txnId;
}
void recordBlocking(TxnId txnId, Known waitingFor, Unseekables<?, ?> routables)
{
Invariants.checkArgument(txnId.equals(this.txnId));
if (blockingState == null)
blockingState = new BlockingState();
blockingState.recordBlocking(waitingFor, routables);
}
CoordinateState local()
{
if (coordinateState == null)
coordinateState = new CoordinateState();
return coordinateState;
}
DisseminateState global()
{
if (disseminateState == null)
disseminateState = new DisseminateState();
return disseminateState;
}
void ensureAtLeast(Command command, CoordinateStatus newStatus, Progress newProgress)
{
local().ensureAtLeast(command, newStatus, newProgress);
}
void ensureAtLeast(CoordinateStatus newStatus, Progress newProgress)
{
local().ensureAtLeast(newStatus, newProgress);
}
void touchNonHomeUnsafe()
{
if (nonHomeState == null)
nonHomeState = new NonHomeState();
}
void setSafe()
{
if (nonHomeState == null)
nonHomeState = new NonHomeState();
nonHomeState.setSafe();
}
@Override
public String toString()
{
return coordinateState != null ? coordinateState.toString()
: nonHomeState != null
? nonHomeState.toString()
: blockingState.toString();
}
}
final CommandStore commandStore;
final Map<TxnId, State> stateMap = new HashMap<>();
boolean isScheduled;
Instance(CommandStore commandStore)
{
this.commandStore = commandStore;
}
State ensure(TxnId txnId)
{
return stateMap.computeIfAbsent(txnId, State::new);
}
State ensure(TxnId txnId, State state)
{
return state != null ? state : ensure(txnId);
}
private void ensureSafeOrAtLeast(Command command, ProgressShard shard, CoordinateStatus newStatus, Progress newProgress)
{
Invariants.checkState(shard != Unsure);
State state = null;
assert newStatus.isAtMostReadyToExecute();
if (newStatus.isAtLeastCommitted())
state = recordCommit(command.txnId());
if (shard.isProgress())
{
state = ensure(command.txnId(), state);
if (shard.isHome()) state.ensureAtLeast(command, newStatus, newProgress);
else ensure(command.txnId()).setSafe();
}
}
State recordCommit(TxnId txnId)
{
State state = stateMap.get(txnId);
if (state != null && state.blockingState != null)
state.blockingState.record(SaveStatus.Committed.known);
return state;
}
State recordApply(TxnId txnId)
{
State state = stateMap.get(txnId);
if (state != null && state.blockingState != null)
state.blockingState.record(SaveStatus.PreApplied.known);
return state;
}
@Override
public void unwitnessed(TxnId txnId, RoutingKey homeKey, ProgressShard shard)
{
if (shard.isHome())
ensure(txnId).ensureAtLeast(Uncommitted, Expected);
}
@Override
public void preaccepted(Command command, ProgressShard shard)
{
Invariants.checkState(shard != Unsure);
if (shard.isProgress())
{
State state = ensure(command.txnId());
if (shard.isHome()) state.ensureAtLeast(command, Uncommitted, Expected);
else state.touchNonHomeUnsafe();
}
}
@Override
public void accepted(Command command, ProgressShard shard)
{
ensureSafeOrAtLeast(command, shard, Uncommitted, Expected);
}
@Override
public void committed(Command command, ProgressShard shard)
{
ensureSafeOrAtLeast(command, shard, CoordinateStatus.Committed, NoneExpected);
}
@Override
public void readyToExecute(Command command, ProgressShard shard)
{
ensureSafeOrAtLeast(command, shard, ReadyToExecute, Expected);
}
@Override
public void executed(Command command, ProgressShard shard)
{
recordApply(command.txnId());
// this is the home shard's state ONLY, so we don't know it is fully durable locally
ensureSafeOrAtLeast(command, shard, ReadyToExecute, Expected);
}
@Override
public void invalidated(Command command, ProgressShard shard)
{
State state = recordApply(command.txnId());
Invariants.checkState(shard == Home || state == null || state.coordinateState == null);
// note: we permit Unsure here, so we check if we have any local home state
if (shard.isProgress())
{
state = ensure(command.txnId(), state);
if (shard.isHome()) state.ensureAtLeast(command, CoordinateStatus.Done, Done);
else ensure(command.txnId()).setSafe();
}
}
@Override
public void durableLocal(TxnId txnId)
{
State state = ensure(txnId);
state.global().durableLocal(node);
}
@Override
public void durable(Command command, @Nullable Set<Id> persistedOn)
{
State state = ensure(command.txnId());
if (!command.status().hasBeen(PreApplied))
state.recordBlocking(command.txnId(), PreApplied.minKnown, command.maxUnseekables());
state.local().durableGlobal();
state.global().durableGlobal(node, command, persistedOn);
}
@Override
public void durable(TxnId txnId, Unseekables<?, ?> unseekables, ProgressShard shard)
{
State state = ensure(txnId);
// TODO (progress consider-prerelease): we can probably simplify things by requiring (empty) Apply messages to be sent also to the coordinating topology
state.recordBlocking(txnId, PreApplied.minKnown, unseekables);
}
@Override
public void waiting(TxnId blockedBy, Known blockedUntil, Unseekables<?, ?> blockedOn)
{
// TODO (perf+ consider-prerelease): consider triggering a preemption of existing coordinator (if any) in some circumstances;
// today, an LWT can pre-empt more efficiently (i.e. instantly) a failed operation whereas Accord will
// wait for some progress interval before taking over; there is probably some middle ground where we trigger
// faster preemption once we're blocked on a transaction, while still offering some amount of time to complete.
// TODO (soon): forward to local progress shard for processing (if known)
// TODO (soon): if we are co-located with the home shard, don't need to do anything unless we're in a
// later topology that wasn't covered by its coordination
ensure(blockedBy).recordBlocking(blockedBy, blockedUntil, blockedOn);
}
@Override
public void addFirst(State.Monitoring add)
{
super.addFirst(add);
ensureScheduled();
}
@Override
public void addLast(State.Monitoring add)
{
throw new UnsupportedOperationException();
}
void ensureScheduled()
{
if (isScheduled)
return;
isScheduled = true;
node.scheduler().once(() -> commandStore.execute(PreLoadContext.empty(), ignore -> run()), 200L, TimeUnit.MILLISECONDS);
}
@Override
public void run()
{
isScheduled = false;
try
{
for (State.Monitoring run : this)
{
if (run.shouldRun())
{
commandStore.execute(contextFor(run.txnId()), safeStore -> {
if (run.shouldRun()) // could have been completed by a callback
run.run(safeStore.command(run.txnId()));
});
}
}
}
catch (Throwable t)
{
t.printStackTrace();
}
finally
{
if (!isEmpty())
ensureScheduled();
}
}
}
@Override
public Instance create(CommandStore commandStore)
{
Instance instance = new Instance(commandStore);
instances.add(instance);
return instance;
}
}