blob: f08dd399ae97ccbe923fe9fb0eafc2d159fcf156 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.coordinate;
import java.util.*;
import java.util.function.BiConsumer;
import accord.api.Result;
import accord.coordinate.tracking.FastPathTracker;
import accord.coordinate.tracking.RequestStatus;
import accord.primitives.*;
import accord.topology.Topologies;
import accord.messages.Callback;
import accord.local.Node;
import accord.local.Node.Id;
import accord.messages.PreAccept;
import accord.messages.PreAccept.PreAcceptOk;
import accord.messages.PreAccept.PreAcceptReply;
import accord.utils.Invariants;
import static accord.coordinate.Propose.Invalidate.proposeInvalidate;
import static accord.messages.Commit.Invalidate.commitInvalidate;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
/**
* Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
* If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
*
* TODO (desired, testing): dedicated burn test to validate outcomes
*/
public class Coordinate extends AsyncResults.SettableResult<Result> implements Callback<PreAcceptReply>, BiConsumer<Result, Throwable>
{
final Node node;
final TxnId txnId;
final Txn txn;
final FullRoute<?> route;
private final FastPathTracker tracker;
private boolean preAcceptIsDone;
private final List<PreAcceptOk> successes;
private Coordinate(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
{
this.node = node;
this.txnId = txnId;
this.txn = txn;
this.route = route;
Topologies topologies = node.topology().withUnsyncedEpochs(route, txnId, txnId);
this.tracker = new FastPathTracker(topologies);
this.successes = new ArrayList<>(tracker.topologies().estimateUniqueNodes());
}
private void start()
{
// TODO (desired, efficiency): consider sending only to electorate of most recent topology (as only these PreAccept votes matter)
// note that we must send to all replicas of old topology, as electorate may not be reachable
node.send(tracker.nodes(), to -> new PreAccept(to, tracker.topologies(), txnId, txn, route),
node.commandStores().select(route.homeKey()), this);
}
public static AsyncResult<Result> coordinate(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
{
Coordinate coordinate = new Coordinate(node, txnId, txn, route);
coordinate.start();
return coordinate;
}
@Override
public synchronized void onFailure(Id from, Throwable failure)
{
if (preAcceptIsDone)
return;
switch (tracker.recordFailure(from))
{
default: throw new AssertionError();
case NoChange:
break;
case Failed:
preAcceptIsDone = true;
tryFailure(new Timeout(txnId, route.homeKey()));
break;
case Success:
onPreAccepted();
}
}
@Override
public void onCallbackFailure(Id from, Throwable failure)
{
tryFailure(failure);
}
@Override
public synchronized void onSuccess(Id from, PreAcceptReply reply)
{
if (preAcceptIsDone)
return;
if (!reply.isOk())
{
// we've been preempted by a recovery coordinator; defer to it, and wait to hear any result
tryFailure(new Preempted(txnId, route.homeKey()));
return;
}
PreAcceptOk ok = (PreAcceptOk) reply;
successes.add(ok);
boolean fastPath = ok.witnessedAt.compareTo(txnId) == 0;
// TODO (desired, safety): update formalisation (and proof), as we do not seek additional pre-accepts from later epochs.
// instead we rely on accept to do our work: a quorum of accept in the later epoch
// and its effect on preaccepted timestamps and the deps it returns create our sync point.
if (tracker.recordSuccess(from, fastPath) == RequestStatus.Success)
onPreAccepted();
}
private synchronized void onPreAccepted()
{
Invariants.checkState(!preAcceptIsDone);
preAcceptIsDone = true;
if (tracker.hasFastPathAccepted())
{
Deps deps = Deps.merge(successes, ok -> ok.witnessedAt.equals(txnId) ? ok.deps : null);
Execute.execute(node, txnId, txn, route, txnId, deps, this);
}
else
{
Deps deps = Deps.merge(successes, ok -> ok.deps);
Timestamp executeAt; {
Timestamp accumulate = Timestamp.NONE;
for (PreAcceptOk preAcceptOk : successes)
accumulate = Timestamp.max(accumulate, preAcceptOk.witnessedAt);
executeAt = accumulate;
}
// TODO (low priority, efficiency): perhaps don't submit Accept immediately if we almost have enough for fast-path,
// but by sending accept we rule out hybrid fast-path
// TODO (low priority, efficiency): if we receive an expired response, perhaps defer to permit at least one other
// node to respond before invalidating
if (node.agent().isExpired(txnId, executeAt.hlc()))
{
proposeInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), (success, fail) -> {
if (fail != null)
{
accept(null, fail);
}
else
{
node.withEpoch(executeAt.epoch(), () -> {
commitInvalidate(node, txnId, route, executeAt);
// TODO (required, API consistency): this should be Invalidated rather than Timeout?
accept(null, new Timeout(txnId, route.homeKey()));
});
}
});
}
else
{
node.withEpoch(executeAt.epoch(), () -> {
Topologies topologies = tracker.topologies();
if (executeAt.epoch() > txnId.epoch())
topologies = node.topology().withUnsyncedEpochs(route, txnId.epoch(), executeAt.epoch());
Propose.propose(node, topologies, Ballot.ZERO, txnId, txn, route, executeAt, deps, this);
});
}
}
}
@Override
public void accept(Result success, Throwable failure)
{
if (failure instanceof CoordinateFailed)
((CoordinateFailed) failure).set(txnId, route.homeKey());
if (success != null) trySuccess(success);
else tryFailure(failure);
}
}