blob: c20c6f3d88aca8df690191ba58cca858aba6944a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.messages;
import java.util.List;
import javax.annotation.Nullable;
import accord.api.Result;
import accord.local.*;
import accord.local.Node.Id;
import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
import accord.primitives.LatestDeps;
import accord.primitives.PartialDeps;
import accord.primitives.PartialRoute;
import accord.primitives.PartialTxn;
import accord.primitives.Ranges;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.topology.Topologies;
import static accord.local.SafeCommandStore.TestDep.WITH;
import static accord.local.SafeCommandStore.TestDep.WITHOUT;
import static accord.local.SafeCommandStore.TestStartedAt.ANY;
import static accord.local.SafeCommandStore.TestStatus.IS_STABLE;
import static accord.local.SafeCommandStore.TestStatus.IS_PROPOSED;
import static accord.local.SafeCommandStore.TestStartedAt.STARTED_AFTER;
import static accord.local.SafeCommandStore.TestStartedAt.STARTED_BEFORE;
import static accord.local.Status.Phase;
import static accord.local.Status.PreAccepted;
import static accord.local.Status.PreCommitted;
import static accord.messages.PreAccept.calculatePartialDeps;
import static accord.utils.Invariants.illegalState;
public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply>
{
public static class SerializationSupport
{
public static BeginRecovery create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, PartialTxn partialTxn, Ballot ballot, @Nullable FullRoute<?> route)
{
return new BeginRecovery(txnId, scope, waitForEpoch, partialTxn, ballot, route);
}
}
public final PartialTxn partialTxn;
public final Ballot ballot;
public final FullRoute<?> route;
public BeginRecovery(Id to, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Ballot ballot)
{
super(to, topologies, route, txnId);
// TODO (expected): only scope.contains(route.homeKey); this affects state eviction and is low priority given size in C*
this.partialTxn = txn.slice(scope.covering(), true);
this.ballot = ballot;
this.route = route;
}
private BeginRecovery(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, PartialTxn partialTxn, Ballot ballot, @Nullable FullRoute<?> route)
{
super(txnId, scope, waitForEpoch);
this.partialTxn = partialTxn;
this.ballot = ballot;
this.route = route;
}
@Override
protected void process()
{
node.mapReduceConsumeLocal(this, txnId.epoch(), txnId.epoch(), this);
}
@Override
public RecoverReply apply(SafeCommandStore safeStore)
{
SafeCommand safeCommand = safeStore.get(txnId, txnId, route);
switch (Commands.recover(safeStore, safeCommand, txnId, partialTxn, route, progressKey, ballot))
{
default:
throw illegalState("Unhandled Outcome");
case Redundant:
case Truncated:
return new RecoverNack(null);
case RejectedBallot:
return new RecoverNack(safeCommand.current().promised());
case Success:
}
Command command = safeCommand.current();
PartialDeps coordinatedDeps = command.partialDeps();
PartialDeps localDeps = null;
if (!command.known().deps.hasCommittedOrDecidedDeps())
{
// TODO (required): consider whether we are safe ignoring the concept of minUnsyncedEpoch here
localDeps = calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, txnId, safeStore.ranges().coordinates(txnId));
}
LatestDeps deps = LatestDeps.create(safeStore.ranges().allAt(txnId.epoch()), command.known().deps, command.acceptedOrCommitted(), coordinatedDeps, localDeps);
boolean rejectsFastPath;
Deps earlierCommittedWitness, earlierAcceptedNoWitness;
if (command.hasBeen(PreCommitted))
{
rejectsFastPath = false;
earlierCommittedWitness = earlierAcceptedNoWitness = Deps.NONE;
}
else
{
// TODO (expected): if we can combine these with the earlierAcceptedNoWitness we can avoid persisting deps on Accept
// the problem is we need some way to ensure liveness. If we were to store witnessedAt as a separate register
// we could return these and filter them by whatever the max witnessedAt is that we discover, OR we could
// filter on replicas to exclude any that are started after anything that is committed, since they will have to adopt
// them as a dependency (but we have to make sure we consider dependency rules, so if there's no write and only reads)
// we might still have new transactions block our execution.
Ranges ranges = safeStore.ranges().allAt(txnId);
rejectsFastPath = hasAcceptedOrCommittedStartedAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys());
if (!rejectsFastPath)
rejectsFastPath = hasStableExecutesAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys());
// TODO (expected, testing): introduce some good unit tests for verifying these two functions in a real repair scenario
// committed txns with an earlier txnid and have our txnid as a dependency
earlierCommittedWitness = stableStartedBeforeAndWitnessed(safeStore, txnId, ranges, partialTxn.keys());
// accepted txns with an earlier txnid that don't have our txnid as a dependency
earlierAcceptedNoWitness = acceptedOrCommittedStartedBeforeWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys());
}
Status status = command.status();
Ballot accepted = command.acceptedOrCommitted();
Timestamp executeAt = command.executeAt();
Writes writes = command.writes();
Result result = command.result();
return new RecoverOk(txnId, status, accepted, executeAt, deps, earlierCommittedWitness, earlierAcceptedNoWitness, rejectsFastPath, writes, result);
}
@Override
public RecoverReply reduce(RecoverReply r1, RecoverReply r2)
{
// TODO (low priority, efficiency): should not operate on dependencies directly here, as we only merge them;
// want a cheaply mergeable variant (or should collect them before merging)
if (!r1.isOk()) return r1;
if (!r2.isOk()) return r2;
RecoverOk ok1 = (RecoverOk) r1;
RecoverOk ok2 = (RecoverOk) r2;
// set ok1 to the most recent of the two
if (ok1 != Status.max(ok1, ok1.status, ok1.accepted, ok2, ok2.status, ok2.accepted))
{
RecoverOk tmp = ok1;
ok1 = ok2;
ok2 = tmp;
}
if (!ok1.status.hasBeen(PreAccepted)) throw new IllegalStateException();
LatestDeps deps = LatestDeps.merge(ok1.deps, ok2.deps);
Deps earlierCommittedWitness = ok1.earlierCommittedWitness.with(ok2.earlierCommittedWitness);
Deps earlierAcceptedNoWitness = ok1.earlierAcceptedNoWitness.with(ok2.earlierAcceptedNoWitness)
.without(earlierCommittedWitness::contains);
Timestamp timestamp = ok1.status == PreAccepted ? Timestamp.max(ok1.executeAt, ok2.executeAt) : ok1.executeAt;
return new RecoverOk(
txnId, ok1.status, ok1.accepted, timestamp,
deps, earlierCommittedWitness, earlierAcceptedNoWitness,
ok1.rejectsFastPath | ok2.rejectsFastPath,
ok1.writes, ok1.result
);
}
@Override
public void accept(RecoverReply reply, Throwable failure)
{
node.reply(replyTo, replyContext, reply, failure);
}
@Override
public TxnId primaryTxnId()
{
return txnId;
}
@Override
public Seekables<?, ?> keys()
{
return partialTxn.keys();
}
@Override
public KeyHistory keyHistory()
{
return KeyHistory.COMMANDS;
}
@Override
public MessageType type()
{
return MessageType.BEGIN_RECOVER_REQ;
}
@Override
public String toString()
{
return "BeginRecovery{" +
"txnId:" + txnId +
", txn:" + partialTxn +
", ballot:" + ballot +
'}';
}
public static abstract class RecoverReply implements Reply
{
@Override
public MessageType type()
{
return MessageType.BEGIN_RECOVER_RSP;
}
public abstract boolean isOk();
}
public static class RecoverOk extends RecoverReply
{
public final TxnId txnId; // for debugging
public final Status status;
public final Ballot accepted;
public final Timestamp executeAt;
public final LatestDeps deps;
public final Deps earlierCommittedWitness; // counter-point to earlierAcceptedNoWitness
public final Deps earlierAcceptedNoWitness; // wait for these to commit
public final boolean rejectsFastPath;
public final Writes writes;
public final Result result;
public RecoverOk(TxnId txnId, Status status, Ballot accepted, Timestamp executeAt, LatestDeps deps, Deps earlierCommittedWitness, Deps earlierAcceptedNoWitness, boolean rejectsFastPath, Writes writes, Result result)
{
this.txnId = txnId;
this.accepted = accepted;
this.executeAt = executeAt;
this.status = status;
this.deps = deps;
this.earlierCommittedWitness = earlierCommittedWitness;
this.earlierAcceptedNoWitness = earlierAcceptedNoWitness;
this.rejectsFastPath = rejectsFastPath;
this.writes = writes;
this.result = result;
}
@Override
public boolean isOk()
{
return true;
}
@Override
public String toString()
{
return toString("RecoverOk");
}
String toString(String kind)
{
return kind + "{" +
"txnId:" + txnId +
", status:" + status +
", accepted:" + accepted +
", executeAt:" + executeAt +
", deps:" + deps +
", earlierCommittedWitness:" + earlierCommittedWitness +
", earlierAcceptedNoWitness:" + earlierAcceptedNoWitness +
", rejectsFastPath:" + rejectsFastPath +
", writes:" + writes +
", result:" + result +
'}';
}
public static RecoverOk maxAcceptedOrLater(List<RecoverOk> recoverOks)
{
return Status.max(recoverOks, r -> r.status, r -> r.accepted, r -> r.status.phase.compareTo(Phase.Accept) >= 0);
}
}
public static class RecoverNack extends RecoverReply
{
public final @Nullable Ballot supersededBy;
public RecoverNack(@Nullable Ballot supersededBy)
{
this.supersededBy = supersededBy;
}
@Override
public boolean isOk()
{
return false;
}
@Override
public String toString()
{
return "RecoverNack{" +
"supersededBy:" + supersededBy +
'}';
}
}
private static Deps acceptedOrCommittedStartedBeforeWithoutWitnessing(SafeCommandStore safeStore, TxnId startedBefore, Ranges ranges, Seekables<?, ?> keys)
{
try (Deps.Builder builder = Deps.builder())
{
// any transaction that started
safeStore.mapReduceFull(keys, ranges, startedBefore, startedBefore.kind().witnessedBy(), STARTED_BEFORE, WITHOUT, IS_PROPOSED,
(startedBefore0, keyOrRange, txnId, executeAt, prev) -> {
if (executeAt.compareTo(startedBefore0) > 0)
builder.add(keyOrRange, txnId);
return builder;
}, startedBefore, builder);
return builder.build();
}
}
private static Deps stableStartedBeforeAndWitnessed(SafeCommandStore safeStore, TxnId startedBefore, Ranges ranges, Seekables<?, ?> keys)
{
try (Deps.Builder builder = Deps.builder())
{
safeStore.mapReduceFull(keys, ranges, startedBefore, startedBefore.kind().witnessedBy(), STARTED_BEFORE, WITH, IS_STABLE,
(p1, keyOrRange, txnId, executeAt, prev) -> builder.add(keyOrRange, txnId), null, (Deps.AbstractBuilder<Deps>)builder);
return builder.build();
}
}
private static boolean hasAcceptedOrCommittedStartedAfterWithoutWitnessing(SafeCommandStore safeStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
{
/*
* The idea here is to discover those transactions that were started after us and have been Accepted
* and did not witness us as part of their pre-accept round, as this means that we CANNOT have taken
* the fast path. This is central to safe recovery, as if every transaction that executes later has
* witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction
* has not witnessed us we can safely invalidate (us).
*
* TODO (required): consider carefully how _adding_ ranges to a CommandStore affects this
*/
return safeStore.mapReduceFull(keys, ranges, startedAfter, startedAfter.kind().witnessedBy(), STARTED_AFTER, WITHOUT, IS_PROPOSED,
(p1, keyOrRange, txnId, executeAt, prev) -> true, null, false);
}
private static boolean hasStableExecutesAfterWithoutWitnessing(SafeCommandStore safeStore, TxnId executesAfter, Ranges ranges, Seekables<?, ?> keys)
{
/*
* The idea here is to discover those transactions that have been decided to execute after us
* and did not witness us as part of their pre-accept or accept round, as this means that we CANNOT have
* taken the fast path. This is central to safe recovery, as if every transaction that executes later has
* witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction
* has not witnessed us we can safely invalidate it.
*/
return safeStore.mapReduceFull(keys, ranges, executesAfter, executesAfter.kind().witnessedBy(), ANY, WITHOUT, IS_STABLE,
(p1, keyOrRange, txnId, executeAt, prev) -> true, null, false);
}
}