blob: 3d699c8eb45f69a67616621a0f05b84d87506f58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.messages;
import accord.api.Result;
import accord.local.SafeCommandStore;
import accord.local.Status.Phase;
import accord.primitives.*;
import accord.topology.Topologies;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import accord.utils.Invariants;
import accord.local.Node.Id;
import accord.local.Command;
import accord.local.Status;
import java.util.Collections;
import static accord.local.SafeCommandStore.TestDep.WITH;
import static accord.local.SafeCommandStore.TestDep.WITHOUT;
import static accord.local.SafeCommandStore.TestKind.RorWs;
import static accord.local.SafeCommandStore.TestTimestamp.*;
import static accord.local.Status.*;
import static accord.messages.PreAccept.calculatePartialDeps;
public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply>
{
public static class SerializationSupport
{
public static BeginRecovery create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, PartialTxn partialTxn, Ballot ballot, @Nullable FullRoute<?> route)
{
return new BeginRecovery(txnId, scope, waitForEpoch, partialTxn, ballot, route);
}
}
public final PartialTxn partialTxn;
public final Ballot ballot;
public final @Nullable FullRoute<?> route;
public BeginRecovery(Id to, Topologies topologies, TxnId txnId, Txn txn, FullRoute<?> route, Ballot ballot)
{
super(to, topologies, route, txnId);
this.partialTxn = txn.slice(scope.covering(), scope.contains(scope.homeKey()));
this.ballot = ballot;
this.route = scope.contains(scope.homeKey()) ? route : null;
}
private BeginRecovery(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, PartialTxn partialTxn, Ballot ballot, @Nullable FullRoute<?> route)
{
super(txnId, scope, waitForEpoch);
this.partialTxn = partialTxn;
this.ballot = ballot;
this.route = route;
}
@Override
protected void process()
{
node.mapReduceConsumeLocal(this, txnId.epoch(), txnId.epoch(), this);
}
@Override
public RecoverReply apply(SafeCommandStore safeStore)
{
Command command = safeStore.command(txnId);
switch (command.recover(safeStore, partialTxn, route != null ? route : scope, progressKey, ballot))
{
default:
throw new IllegalStateException("Unhandled Outcome");
case Redundant:
throw new IllegalStateException("Invalid Outcome");
case RejectedBallot:
return new RecoverNack(command.promised());
case Success:
}
PartialDeps deps = command.partialDeps();
if (!command.known().deps.hasProposedOrDecidedDeps())
{
deps = calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().at(txnId.epoch()));
}
boolean rejectsFastPath;
Deps earlierCommittedWitness, earlierAcceptedNoWitness;
if (command.hasBeen(PreCommitted))
{
rejectsFastPath = false;
earlierCommittedWitness = earlierAcceptedNoWitness = Deps.NONE;
}
else
{
Ranges ranges = safeStore.ranges().at(txnId.epoch());
rejectsFastPath = hasAcceptedStartedAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys());
if (!rejectsFastPath)
rejectsFastPath = hasCommittedExecutesAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys());
// TODO (expected, testing): introduce some good unit tests for verifying these two functions in a real repair scenario
// committed txns with an earlier txnid and have our txnid as a dependency
earlierCommittedWitness = committedStartedBeforeAndWitnessed(safeStore, txnId, ranges, partialTxn.keys());
// accepted txns with an earlier txnid that don't have our txnid as a dependency
earlierAcceptedNoWitness = acceptedStartedBeforeWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys());
}
return new RecoverOk(txnId, command.status(), command.accepted(), command.executeAt(), deps, earlierCommittedWitness, earlierAcceptedNoWitness, rejectsFastPath, command.writes(), command.result());
}
@Override
public RecoverReply reduce(RecoverReply r1, RecoverReply r2)
{
// TODO (low priority, efficiency): should not operate on dependencies directly here, as we only merge them;
// want a cheaply mergeable variant (or should collect them before merging)
if (!r1.isOk()) return r1;
if (!r2.isOk()) return r2;
RecoverOk ok1 = (RecoverOk) r1;
RecoverOk ok2 = (RecoverOk) r2;
// set ok1 to the most recent of the two
if (ok1 != Status.max(ok1, ok1.status, ok1.accepted, ok2, ok2.status, ok2.accepted))
{
RecoverOk tmp = ok1;
ok1 = ok2;
ok2 = tmp;
}
if (!ok1.status.hasBeen(PreAccepted)) throw new IllegalStateException();
PartialDeps deps = ok1.deps.with(ok2.deps);
Deps earlierCommittedWitness = ok1.earlierCommittedWitness.with(ok2.earlierCommittedWitness);
Deps earlierAcceptedNoWitness = ok1.earlierAcceptedNoWitness.with(ok2.earlierAcceptedNoWitness)
.without(earlierCommittedWitness::contains);
Timestamp timestamp = ok1.status == PreAccepted ? Timestamp.max(ok1.executeAt, ok2.executeAt) : ok1.executeAt;
return new RecoverOk(
txnId, ok1.status, ok1.accepted, timestamp, deps,
earlierCommittedWitness, earlierAcceptedNoWitness,
ok1.rejectsFastPath | ok2.rejectsFastPath,
ok1.writes, ok1.result);
}
@Override
public void accept(RecoverReply reply, Throwable failure)
{
node.reply(replyTo, replyContext, reply);
}
@Override
public Iterable<TxnId> txnIds()
{
return Collections.singleton(txnId);
}
@Override
public Seekables<?, ?> keys()
{
return partialTxn.keys();
}
@Override
public MessageType type()
{
return MessageType.BEGIN_RECOVER_REQ;
}
@Override
public String toString()
{
return "BeginRecovery{" +
"txnId:" + txnId +
", txn:" + partialTxn +
", ballot:" + ballot +
'}';
}
public static abstract class RecoverReply implements Reply
{
@Override
public MessageType type()
{
return MessageType.BEGIN_RECOVER_RSP;
}
public abstract boolean isOk();
}
public static class RecoverOk extends RecoverReply
{
public final TxnId txnId; // for debugging
public final Status status;
public final Ballot accepted;
public final Timestamp executeAt;
public final PartialDeps deps;
public final Deps earlierCommittedWitness; // counter-point to earlierAcceptedNoWitness
public final Deps earlierAcceptedNoWitness; // wait for these to commit
public final boolean rejectsFastPath;
public final Writes writes;
public final Result result;
public RecoverOk(TxnId txnId, Status status, Ballot accepted, Timestamp executeAt, @Nonnull PartialDeps deps, Deps earlierCommittedWitness, Deps earlierAcceptedNoWitness, boolean rejectsFastPath, Writes writes, Result result)
{
this.txnId = txnId;
this.accepted = accepted;
this.executeAt = executeAt;
this.status = status;
this.deps = Invariants.nonNull(deps);
this.earlierCommittedWitness = earlierCommittedWitness;
this.earlierAcceptedNoWitness = earlierAcceptedNoWitness;
this.rejectsFastPath = rejectsFastPath;
this.writes = writes;
this.result = result;
}
@Override
public boolean isOk()
{
return true;
}
@Override
public String toString()
{
return toString("RecoverOk");
}
String toString(String kind)
{
return kind + "{" +
"txnId:" + txnId +
", status:" + status +
", accepted:" + accepted +
", executeAt:" + executeAt +
", deps:" + deps +
", earlierCommittedWitness:" + earlierCommittedWitness +
", earlierAcceptedNoWitness:" + earlierAcceptedNoWitness +
", rejectsFastPath:" + rejectsFastPath +
", writes:" + writes +
", result:" + result +
'}';
}
public static RecoverOk maxAcceptedOrLater(List<RecoverOk> recoverOks)
{
return Status.max(recoverOks, r -> r.status, r -> r.accepted, r -> r.status.phase.compareTo(Phase.Accept) >= 0);
}
}
public static class RecoverNack extends RecoverReply
{
public final Ballot supersededBy;
public RecoverNack(Ballot supersededBy)
{
this.supersededBy = supersededBy;
}
@Override
public boolean isOk()
{
return false;
}
@Override
public String toString()
{
return "RecoverNack{" +
"supersededBy:" + supersededBy +
'}';
}
}
private static Deps acceptedStartedBeforeWithoutWitnessing(SafeCommandStore commandStore, TxnId startedBefore, Ranges ranges, Seekables<?, ?> keys)
{
try (Deps.Builder builder = Deps.builder())
{
commandStore.mapReduce(keys, ranges, RorWs, STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted,
(keyOrRange, txnId, executeAt, prev) -> {
if (executeAt.compareTo(startedBefore) > 0)
builder.add(keyOrRange, txnId);
return builder;
}, builder, null);
return builder.build();
}
}
private static Deps committedStartedBeforeAndWitnessed(SafeCommandStore commandStore, TxnId startedBefore, Ranges ranges, Seekables<?, ?> keys)
{
try (Deps.Builder builder = Deps.builder())
{
commandStore.mapReduce(keys, ranges, RorWs, STARTED_BEFORE, startedBefore, WITH, startedBefore, Committed, null,
(keyOrRange, txnId, executeAt, prev) -> builder.add(keyOrRange, txnId), (Deps.AbstractBuilder<Deps>)builder, null);
return builder.build();
}
}
private static boolean hasAcceptedStartedAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
{
/*
* The idea here is to discover those transactions that were started after us and have been Accepted
* and did not witness us as part of their pre-accept round, as this means that we CANNOT have taken
* the fast path. This is central to safe recovery, as if every transaction that executes later has
* witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction
* has not witnessed us we can safely invalidate (us).
*/
return commandStore.mapReduce(keys, ranges, RorWs, STARTED_AFTER, startedAfter, WITHOUT, startedAfter, Accepted, PreCommitted,
(keyOrRange, txnId, executeAt, prev) -> true, false, true);
}
private static boolean hasCommittedExecutesAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
{
/*
* The idea here is to discover those transactions that have been decided to execute after us
* and did not witness us as part of their pre-accept or accept round, as this means that we CANNOT have
* taken the fast path. This is central to safe recovery, as if every transaction that executes later has
* witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction
* has not witnessed us we can safely invalidate it.
*/
return commandStore.mapReduce(keys, ranges, RorWs, EXECUTES_AFTER, startedAfter, WITHOUT, startedAfter, Committed, null,
(keyOrRange, txnId, executeAt, prev) -> true,false, true);
}
}