blob: 0d726b8d475fc2b348912e917107b3b10df8974e [file] [log] [blame]
package accord.coordinate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
import accord.messages.Preempted;
import accord.messages.Timeout;
import accord.txn.Ballot;
import accord.messages.Callback;
import accord.local.Node;
import accord.txn.Dependencies;
import accord.txn.Keys;
import accord.local.Node.Id;
import accord.txn.Timestamp;
import accord.messages.PreAccept;
import accord.messages.PreAccept.PreAcceptOk;
import accord.txn.Txn;
import accord.txn.TxnId;
import accord.messages.PreAccept.PreAcceptReply;
* Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
* If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
class Agree extends AcceptPhase implements Callback<PreAcceptReply>
final Keys keys;
public enum PreacceptOutcome { COMMIT, ACCEPT }
// TODO: handle reconfigurations
private int[] preAccepts;
private int[] fastPathPreAccepts;
private int[] failures;
private int[] responsesOutstanding;
private int preAccepted;
private int fastPathAccepted;
private int noOutstandingResponses;
private PreacceptOutcome preacceptOutcome;
private final List<PreAcceptOk> preAcceptOks = new ArrayList<>();
// TODO: hybrid fast path? or at least short-circuit accept if we gain a fast-path quorum _and_ proposed one by accept
boolean permitHybridFastPath;
private Agree(Node node, TxnId txnId, Txn txn)
super(node, Ballot.ZERO, txnId, txn, node.cluster().forKeys(txn.keys()));
this.keys = txn.keys();
this.failures = new int[shards.size()];
this.preAccepts = new int[shards.size()];
this.fastPathPreAccepts = new int[shards.size()];
this.responsesOutstanding = new int[shards.size()];
shards.forEach((i, shard) -> {
this.responsesOutstanding[i] = shard.nodes.size();
node.send(shards, new PreAccept(txnId, txn), this);
private void messageReceived(int shard)
if (--responsesOutstanding[shard] == 0)
public void onSuccess(Id from, PreAcceptReply response)
onPreAccept(from, response);
public void onFailure(Id from, Throwable throwable)
if (isDone() || isPreAccepted())
shards.forEachOn(from, (i, shard) -> {
if (++failures[i] >= shard.slowPathQuorumSize)
completeExceptionally(new Timeout());
// if no other responses are expected and the slow quorum has been satisfied, proceed
if (shouldSlowPathAccept())
private synchronized void onPreAccept(Id from, PreAcceptReply receive)
if (isDone() || isPreAccepted())
if (!receive.isOK())
// we've been preempted by a recovery coordinator; defer to it, and wait to hear any result
completeExceptionally(new Preempted());
PreAcceptOk ok = (PreAcceptOk) receive;
boolean fastPath = ok.witnessedAt.compareTo(txnId) == 0;
shards.forEachOn(from, (i, shard) -> {
if (fastPath && shard.fastPathElectorate.contains(from) && ++fastPathPreAccepts[i] == shard.fastPathQuorumSize)
if (++preAccepts[i] == shard.slowPathQuorumSize)
if (isFastPathAccepted() || shouldSlowPathAccept())
private void onPreAccepted()
if (isFastPathAccepted())
preacceptOutcome = PreacceptOutcome.COMMIT;
Dependencies deps = new Dependencies();
for (PreAcceptOk preAcceptOk : preAcceptOks)
if (preAcceptOk.witnessedAt.equals(txnId))
agreed(txnId, deps);
preacceptOutcome = PreacceptOutcome.ACCEPT;
Timestamp executeAt = Timestamp.NONE;
Dependencies deps = new Dependencies();
for (PreAcceptOk preAcceptOk : preAcceptOks)
executeAt = Timestamp.max(executeAt, preAcceptOk.witnessedAt);
// TODO: perhaps don't submit Accept immediately if we almost have enough for fast-path,
// but by sending accept we rule out hybrid fast-path
permitHybridFastPath = executeAt.compareTo(txnId) == 0;
startAccept(executeAt, deps);
private boolean isFastPathAccepted()
return fastPathAccepted == shards.size();
private boolean shouldSlowPathAccept()
return noOutstandingResponses == shards.size() && preAccepted == shards.size();
private boolean isPreAccepted()
return preacceptOutcome != null;
static CompletionStage<Agreed> agree(Node node, TxnId txnId, Txn txn)
return new Agree(node, txnId, txn);