blob: 575252d79767fb5a0d4a5bb4942cdcc003d4bc11 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.impl;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.ProgressLog;
import accord.coordinate.FetchData;
import accord.coordinate.Outcome;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.Commands;
import accord.local.Node;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.local.SaveStatus;
import accord.local.SaveStatus.LocalExecution;
import accord.local.Status;
import accord.local.Status.Known;
import accord.primitives.EpochSupplier;
import accord.primitives.Participants;
import accord.primitives.ProgressToken;
import accord.primitives.Ranges;
import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
import accord.utils.IntrusiveLinkedList;
import accord.utils.IntrusiveLinkedListNode;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncResult;
import static accord.api.ProgressLog.ProgressShard.Unsure;
import static accord.coordinate.InformHomeOfTxn.inform;
import static accord.impl.SimpleProgressLog.CoordinateStatus.ReadyToExecute;
import static accord.impl.SimpleProgressLog.CoordinateStatus.Uncommitted;
import static accord.impl.SimpleProgressLog.Progress.Done;
import static accord.impl.SimpleProgressLog.Progress.Expected;
import static accord.impl.SimpleProgressLog.Progress.Investigating;
import static accord.impl.SimpleProgressLog.Progress.NoProgress;
import static accord.impl.SimpleProgressLog.Progress.NoneExpected;
import static accord.local.PreLoadContext.contextFor;
import static accord.local.PreLoadContext.empty;
import static accord.local.SaveStatus.LocalExecution.NotReady;
import static accord.local.SaveStatus.LocalExecution.WaitingToApply;
import static accord.local.Status.Durability.MajorityOrInvalidated;
import static accord.local.Status.PreApplied;
// TODO (desired, consider): consider propagating invalidations in the same way as we do applied
// TODO (expected): report long-lived recurring transactions / operations
public class SimpleProgressLog implements ProgressLog.Factory
{
private static final Logger logger = LoggerFactory.getLogger(SimpleProgressLog.class);
enum Progress { NoneExpected, Expected, NoProgress, Investigating, Done }
public static volatile boolean PAUSE_FOR_TEST = false;
enum CoordinateStatus
{
NotWitnessed, Uncommitted, Committed, ReadyToExecute, Done;
boolean isAtMostReadyToExecute()
{
return compareTo(CoordinateStatus.ReadyToExecute) <= 0;
}
boolean isAtLeastCommitted()
{
return compareTo(CoordinateStatus.Committed) >= 0;
}
}
final Node node;
final List<Instance> instances = new CopyOnWriteArrayList<>();
public SimpleProgressLog(Node node)
{
this.node = node;
}
class Instance extends IntrusiveLinkedList<Instance.State.Monitoring> implements ProgressLog, Runnable
{
class State
{
abstract class Monitoring extends IntrusiveLinkedListNode
{
private Progress progress = NoneExpected;
void setProgress(Progress newProgress)
{
Invariants.checkState(progress != Done);
progress = newProgress;
switch (newProgress)
{
default: throw new AssertionError();
case NoneExpected:
case Done:
case Investigating:
remove();
Invariants.paranoid(isFree());
break;
case Expected:
case NoProgress:
if (isFree())
addFirst(this);
}
}
void ensureDone()
{
if (progress != Done)
setProgress(Done);
}
boolean shouldRun()
{
switch (progress)
{
default: throw new AssertionError("Unexpected progress: " + progress);
case NoneExpected:
case Done:
return false;
case Investigating:
throw new IllegalStateException("Unexpected progress: " + progress);
case Expected:
Invariants.paranoid(!isFree());
progress = NoProgress;
return false;
case NoProgress:
remove();
return true;
}
}
abstract void run(SafeCommandStore safeStore, SafeCommand safeCommand);
Progress progress()
{
return progress;
}
TxnId txnId()
{
return txnId;
}
}
// exists only on home shard
// TODO (expected): should not take any prompt action if we're ourselves already coordinating the transaction
class CoordinateState extends Monitoring
{
CoordinateStatus status = CoordinateStatus.NotWitnessed;
ProgressToken token = ProgressToken.NONE;
Object debugInvestigating;
void ensureAtLeast(Command command, CoordinateStatus newStatus, Progress newProgress)
{
ensureAtLeast(newStatus, newProgress);
updateMax(command);
}
void ensureAtLeast(CoordinateStatus newStatus, Progress newProgress)
{
if (newStatus.compareTo(status) > 0)
{
status = newStatus;
setProgress(newProgress);
}
}
void updateMax(Command command)
{
token = token.merge(new ProgressToken(command.durability(), command.status(), command.promised(), command.accepted()));
}
void updateMax(ProgressToken ok)
{
// TODO (low priority): perhaps set localProgress back to Waiting if Investigating and we update anything?
token = token.merge(ok);
}
void durableGlobal()
{
switch (status)
{
default: throw new IllegalStateException();
case NotWitnessed:
case Uncommitted:
case Committed:
case ReadyToExecute:
status = CoordinateStatus.Done;
setProgress(Done);
case Done:
}
}
@Override
void run(SafeCommandStore safeStore, SafeCommand safeCommand)
{
Command command = safeCommand.current();
Invariants.checkState(!safeStore.isTruncated(command), "Command %s is truncated", command);
setProgress(Investigating);
switch (status)
{
default: throw new AssertionError("Unexpected status: " + status);
case NotWitnessed: // can't make progress if we haven't witnessed it yet
case Committed: // can't make progress if we aren't yet ReadyToExecute
case Done: // shouldn't be trying to make progress, as we're done
throw new IllegalStateException("Unexpected status: " + status);
case ReadyToExecute:
// TODO (expected): we should have already exited this loop if this conditions is true
if (command.durability().isDurableOrInvalidated())
{
// should not reach here with an invalidated state
Invariants.checkState(command.durability().isDurable(), "Command %s is not durable", command);
durableGlobal();
return;
}
case Uncommitted:
{
Invariants.checkState(token.status != Status.Invalidated, "ProgressToken is invalidated, but we have not cleared the ProgressLog");
// TODO (expected): we should have already exited this loop if either of these conditions are true
if (command.durability().isDurableOrInvalidated() || token.durability.isDurableOrInvalidated())
{
if (!command.durability().isDurableOrInvalidated())
Commands.setDurability(safeStore, safeCommand, MajorityOrInvalidated);
durableGlobal();
return;
}
Route<?> route = Invariants.nonNull(command.route()).withHomeKey();
node.withEpoch(txnId.epoch(), () -> {
AsyncResult<? extends Outcome> recover = node.maybeRecover(txnId, route, token);
recover.addCallback((success, fail) -> {
// TODO (expected): callback should be on safeStore, and should provide safeStore as a parameter
commandStore.execute(contextFor(txnId), safeStore0 -> {
if (status.isAtMostReadyToExecute() && progress() == Investigating)
{
setProgress(Expected);
if (fail != null)
return;
updateMax(success.asProgressToken());
}
}).begin(node.agent());
});
debugInvestigating = recover;
});
}
}
}
@Override
public String toString()
{
return "{" + status + ',' + progress() + '}';
}
}
class BlockingState extends State.Monitoring
{
LocalExecution blockedUntil = NotReady;
Route<?> route;
Participants<?> participants;
Object debugInvestigating;
@SuppressWarnings({"unchecked", "rawtypes"})
void recordBlocking(LocalExecution blockedUntil, @Nullable Route<?> route, @Nullable Participants<?> participants)
{
Invariants.checkState(route != null || participants != null, "Route and participants are both undefined");
Invariants.checkState(participants == null || !participants.isEmpty(), "participants is empty");
Invariants.checkState(route == null || route.hasParticipants(), "Route %s does not have participants", route);
this.route = Route.merge(this.route, (Route)route);
this.participants = Participants.merge(this.participants, (Participants) participants);
if (blockedUntil.compareTo(this.blockedUntil) > 0)
{
this.blockedUntil = blockedUntil;
setProgress(Expected);
}
}
void record(Known known)
{
// invalidation coordination callback may fire
// before invalidation is committed locally
if (progress() == Done)
return;
if (blockedUntil.isSatisfiedBy(known))
setProgress(NoneExpected);
}
@Override
void run(SafeCommandStore safeStore, SafeCommand safeCommand)
{
Command command = safeCommand.current();
if (command.isAtLeast(blockedUntil))
{
setProgress(NoneExpected);
return;
}
setProgress(Investigating);
// first make sure we have enough information to obtain the command locally
Timestamp executeAt = command.executeAtIfKnown();
// we want to fetch a route if we have it, so that we can go to our neighbouring shards for info
// (rather than the home shard, which may have GC'd its state if the result is durable)
Unseekables<?> fetchKeys = maxContact(command);
EpochSupplier forLocalEpoch = safeStore.ranges().latestEpochWithNewParticipants(txnId.epoch(), fetchKeys);
BiConsumer<Known, Throwable> callback = (success, fail) -> {
// TODO (expected): this should be invoked on this commandStore; also do not need to load txn unless in DEBUG mode
commandStore.execute(contextFor(txnId), safeStore0 -> {
if (progress() != Investigating)
return;
setProgress(Expected);
Invariants.checkState(fail != null || !blockedUntil.isSatisfiedBy(success.propagates()));
}).begin(commandStore.agent());
};
node.withEpoch(blockedUntil.fetchEpoch(txnId, executeAt), () -> {
debugInvestigating = FetchData.fetch(blockedUntil.requires, node, txnId, fetchKeys, forLocalEpoch, executeAt, callback);
});
}
@SuppressWarnings({"rawtypes", "unchecked"})
private Unseekables<?> maxContact(Command command)
{
Route<?> route = Route.merge(command.route(), (Route)this.route);
if (route != null) route = route.withHomeKey();
return Unseekables.merge(route == null ? null : route.withHomeKey(), (Unseekables)participants);
}
@SuppressWarnings({"unchecked", "rawtypes"})
private Participants<?> maxParticipants(Command command)
{
Route<?> route = Route.merge(command.route(), (Route)this.route);
return Participants.merge(route == null ? null : route.participants(), (Participants) participants);
}
@Override
public String toString()
{
return progress().toString();
}
}
class NonHomeState extends State.Monitoring
{
NonHomeState()
{
setProgress(Expected);
}
void setSafe()
{
if (progress() != Done)
setProgress(Done);
}
@Override
void run(SafeCommandStore safeStore, SafeCommand safeCommand)
{
Command command = safeCommand.current();
// make sure a quorum of the home shard is aware of the transaction, so we can rely on it to ensure progress
if (command.route() == null)
throw new AssertionError(String.format("Attempted to inform but route is not known for command %s", command));
AsyncChain<Void> inform = inform(node, txnId, command.route());
inform.begin((success, fail) -> {
commandStore.execute(empty(), ignore -> {
if (progress() == Done)
return;
setProgress(fail != null ? Expected : Done);
}).begin(commandStore.agent());
});
}
@Override
public String toString()
{
return progress() == Done ? "Safe" : "Unsafe";
}
}
final TxnId txnId;
CoordinateState coordinateState;
NonHomeState nonHomeState;
BlockingState blockingState;
State(TxnId txnId)
{
this.txnId = txnId;
}
void recordBlocking(TxnId txnId, LocalExecution waitingFor, Route<?> route, Participants<?> participants)
{
Invariants.checkArgument(txnId.equals(this.txnId));
if (blockingState == null)
blockingState = new BlockingState();
blockingState.recordBlocking(waitingFor, route, participants);
}
CoordinateState coordinate()
{
if (coordinateState == null)
coordinateState = new CoordinateState();
return coordinateState;
}
void ensureAtLeast(Command command, CoordinateStatus newStatus, Progress newProgress)
{
coordinate().ensureAtLeast(command, newStatus, newProgress);
}
void ensureAtLeast(CoordinateStatus newStatus, Progress newProgress)
{
coordinate().ensureAtLeast(newStatus, newProgress);
}
void touchNonHomeUnsafe()
{
if (nonHomeState == null)
nonHomeState = new NonHomeState();
}
void setSafe()
{
if (nonHomeState == null)
nonHomeState = new NonHomeState();
nonHomeState.setSafe();
}
@Override
public String toString()
{
return coordinateState != null ? coordinateState.toString()
: nonHomeState != null
? nonHomeState.toString()
: blockingState.toString();
}
}
final CommandStore commandStore;
final Map<TxnId, State> stateMap = new TreeMap<>();
boolean isScheduled;
Instance(CommandStore commandStore)
{
this.commandStore = commandStore;
}
State ensure(TxnId txnId)
{
return stateMap.computeIfAbsent(txnId, ignored -> {
node.agent().metricsEventsListener().onProgressLogSizeChange(txnId, 1);
return new State(txnId);
});
}
State ensure(TxnId txnId, State state)
{
return state != null ? state : ensure(txnId);
}
private void ensureSafeOrAtLeast(Command command, ProgressShard shard, CoordinateStatus newStatus, Progress newProgress)
{
Invariants.checkState(shard != Unsure);
State state = null;
assert newStatus.isAtMostReadyToExecute();
if (newStatus.isAtLeastCommitted())
state = recordCommit(command.txnId());
if (shard.isProgress())
{
state = ensure(command.txnId(), state);
if (shard.isHome()) state.ensureAtLeast(command, newStatus, newProgress);
else ensure(command.txnId()).setSafe();
}
}
State recordCommit(TxnId txnId)
{
State state = stateMap.get(txnId);
if (state != null && state.blockingState != null)
state.blockingState.record(SaveStatus.Committed.known);
return state;
}
State recordApply(TxnId txnId)
{
State state = stateMap.get(txnId);
if (state != null && state.blockingState != null)
state.blockingState.record(SaveStatus.PreApplied.known);
return state;
}
@Override
public void unwitnessed(TxnId txnId, ProgressShard shard)
{
if (shard.isHome())
ensure(txnId).ensureAtLeast(Uncommitted, Expected);
}
@Override
public void preaccepted(Command command, ProgressShard shard)
{
Invariants.checkState(shard != Unsure);
if (shard.isProgress())
{
State state = ensure(command.txnId());
if (shard.isHome()) state.ensureAtLeast(command, Uncommitted, Expected);
else state.touchNonHomeUnsafe();
}
}
@Override
public void accepted(Command command, ProgressShard shard)
{
ensureSafeOrAtLeast(command, shard, Uncommitted, Expected);
}
@Override
public void precommitted(Command command)
{
State state = stateMap.get(command.txnId());
if (state != null && state.blockingState != null)
state.blockingState.record(SaveStatus.PreCommitted.known);
}
@Override
public void committed(Command command, ProgressShard shard)
{
ensureSafeOrAtLeast(command, shard, CoordinateStatus.Committed, NoneExpected);
}
@Override
public void readyToExecute(Command command)
{
State state = stateMap.get(command.txnId());
if (state == null)
return; // not progress shard, and nothing blocking
Invariants.checkState(state.nonHomeState == null || state.nonHomeState.progress() == Done, "nonHomeState should have been set safe by call to committed");
if (state.coordinateState != null)
state.coordinateState.ensureAtLeast(command, ReadyToExecute, Expected);
}
@Override
public void executed(Command command, ProgressShard shard)
{
recordApply(command.txnId());
// this is the home shard's state ONLY, so we don't know it is fully durable locally
ensureSafeOrAtLeast(command, shard, ReadyToExecute, Expected);
}
@Override
public void clear(TxnId txnId)
{
State state = stateMap.remove(txnId);
if (state == null)
return;
if (state.blockingState != null)
state.blockingState.ensureDone();
if (state.nonHomeState != null)
state.nonHomeState.ensureDone();
if (state.coordinateState != null)
state.coordinateState.durableGlobal();
}
@Override
public void durable(Command command)
{
State state = ensure(command.txnId());
// if we participate in the transaction and its outcome hasn't been recorded locally then fetch it
// TODO (expected): we should delete the in-memory state once we reach Done, and guard against backward progress with Command state
// however, we need to be careful:
// - we might participate in the execution epoch so need to be sure we have received a route covering both
if (command.route() == null)
return;
Ranges coordinateRanges = commandStore.unsafeRangesForEpoch().allAt(command.txnId().epoch());
if (!command.status().hasBeen(PreApplied) && command.route().participatesIn(coordinateRanges))
state.recordBlocking(command.txnId(), WaitingToApply, command.route(), null);
if (coordinateRanges.contains(command.route().homeKey()))
state.coordinate().durableGlobal();
}
@Override
public void waiting(SafeCommand blockedBy, LocalExecution blockedUntil, Route<?> blockedOnRoute, Participants<?> blockedOnParticipants)
{
if (blockedBy.txnId().rw().isLocal())
return;
// ensure we have a record to work with later; otherwise may think has been truncated
blockedBy.initialise();
if (blockedBy.current().has(blockedUntil.requires))
return;
// TODO (consider): consider triggering a preemption of existing coordinator (if any) in some circumstances;
// today, an LWT can pre-empt more efficiently (i.e. instantly) a failed operation whereas Accord will
// wait for some progress interval before taking over; there is probably some middle ground where we trigger
// faster preemption once we're blocked on a transaction, while still offering some amount of time to complete.
// TODO (desirable, efficiency): forward to local progress shard for processing (if known)
// TODO (desirable, efficiency): if we are co-located with the home shard, don't need to do anything unless we're in a
// later topology that wasn't covered by its coordination
ensure(blockedBy.txnId()).recordBlocking(blockedBy.txnId(), blockedUntil, blockedOnRoute, blockedOnParticipants);
}
@Override
public void addFirst(State.Monitoring add)
{
super.addFirst(add);
ensureScheduled();
}
@Override
public void addLast(State.Monitoring add)
{
throw new UnsupportedOperationException();
}
void ensureScheduled()
{
if (isScheduled)
return;
isScheduled = true;
Duration delay = node.localConfig().getProgressLogScheduleDelay();
node.scheduler().once(() -> commandStore.execute(empty(), ignore -> run()).begin(commandStore.agent()), delay.toNanos(), TimeUnit.NANOSECONDS);
}
@Override
public void run()
{
isScheduled = false;
try
{
if (PAUSE_FOR_TEST)
{
logger.info("Skipping progress log because it is paused for test");
return;
}
for (State.Monitoring run : this)
{
if (run.shouldRun())
{
commandStore.execute(contextFor(run.txnId()), safeStore -> {
if (run.shouldRun()) // could have been completed by a callback
{
SafeCommand safeCommand = safeStore.ifInitialised(run.txnId());
if (safeCommand != null && run.shouldRun()) // could be truncated on access
run.run(safeStore, safeCommand);
}
}).begin(commandStore.agent());
}
}
}
catch (Throwable t)
{
t.printStackTrace();
}
finally
{
if (!isEmpty())
ensureScheduled();
}
}
}
@Override
public Instance create(CommandStore commandStore)
{
Instance instance = new Instance(commandStore);
instances.add(instance);
return instance;
}
}