blob: 0d726b8d475fc2b348912e917107b3b10df8974e [file] [log] [blame]
package accord.coordinate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
import accord.messages.Preempted;
import accord.messages.Timeout;
import accord.txn.Ballot;
import accord.messages.Callback;
import accord.local.Node;
import accord.txn.Dependencies;
import accord.txn.Keys;
import accord.local.Node.Id;
import accord.txn.Timestamp;
import accord.messages.PreAccept;
import accord.messages.PreAccept.PreAcceptOk;
import accord.txn.Txn;
import accord.txn.TxnId;
import accord.messages.PreAccept.PreAcceptReply;
/**
* Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
* If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
*/
class Agree extends AcceptPhase implements Callback<PreAcceptReply>
{
final Keys keys;
public enum PreacceptOutcome { COMMIT, ACCEPT }
// TODO: handle reconfigurations
private int[] preAccepts;
private int[] fastPathPreAccepts;
private int[] failures;
private int[] responsesOutstanding;
private int preAccepted;
private int fastPathAccepted;
private int noOutstandingResponses;
private PreacceptOutcome preacceptOutcome;
private final List<PreAcceptOk> preAcceptOks = new ArrayList<>();
// TODO: hybrid fast path? or at least short-circuit accept if we gain a fast-path quorum _and_ proposed one by accept
boolean permitHybridFastPath;
private Agree(Node node, TxnId txnId, Txn txn)
{
super(node, Ballot.ZERO, txnId, txn, node.cluster().forKeys(txn.keys()));
this.keys = txn.keys();
this.failures = new int[shards.size()];
this.preAccepts = new int[shards.size()];
this.fastPathPreAccepts = new int[shards.size()];
this.responsesOutstanding = new int[shards.size()];
shards.forEach((i, shard) -> {
this.responsesOutstanding[i] = shard.nodes.size();
});
node.send(shards, new PreAccept(txnId, txn), this);
}
private void messageReceived(int shard)
{
if (--responsesOutstanding[shard] == 0)
noOutstandingResponses++;
}
@Override
public void onSuccess(Id from, PreAcceptReply response)
{
onPreAccept(from, response);
}
@Override
public void onFailure(Id from, Throwable throwable)
{
if (isDone() || isPreAccepted())
return;
shards.forEachOn(from, (i, shard) -> {
messageReceived(i);
if (++failures[i] >= shard.slowPathQuorumSize)
completeExceptionally(new Timeout());
});
// if no other responses are expected and the slow quorum has been satisfied, proceed
if (shouldSlowPathAccept())
onPreAccepted();
}
private synchronized void onPreAccept(Id from, PreAcceptReply receive)
{
if (isDone() || isPreAccepted())
return;
if (!receive.isOK())
{
// we've been preempted by a recovery coordinator; defer to it, and wait to hear any result
completeExceptionally(new Preempted());
return;
}
PreAcceptOk ok = (PreAcceptOk) receive;
preAcceptOks.add(ok);
boolean fastPath = ok.witnessedAt.compareTo(txnId) == 0;
shards.forEachOn(from, (i, shard) -> {
messageReceived(i);
if (fastPath && shard.fastPathElectorate.contains(from) && ++fastPathPreAccepts[i] == shard.fastPathQuorumSize)
++fastPathAccepted;
if (++preAccepts[i] == shard.slowPathQuorumSize)
++preAccepted;
});
if (isFastPathAccepted() || shouldSlowPathAccept())
onPreAccepted();
}
private void onPreAccepted()
{
if (isFastPathAccepted())
{
preacceptOutcome = PreacceptOutcome.COMMIT;
Dependencies deps = new Dependencies();
for (PreAcceptOk preAcceptOk : preAcceptOks)
{
if (preAcceptOk.witnessedAt.equals(txnId))
deps.addAll(preAcceptOk.deps);
}
agreed(txnId, deps);
}
else
{
preacceptOutcome = PreacceptOutcome.ACCEPT;
Timestamp executeAt = Timestamp.NONE;
Dependencies deps = new Dependencies();
for (PreAcceptOk preAcceptOk : preAcceptOks)
{
deps.addAll(preAcceptOk.deps);
executeAt = Timestamp.max(executeAt, preAcceptOk.witnessedAt);
}
// TODO: perhaps don't submit Accept immediately if we almost have enough for fast-path,
// but by sending accept we rule out hybrid fast-path
permitHybridFastPath = executeAt.compareTo(txnId) == 0;
startAccept(executeAt, deps);
}
}
private boolean isFastPathAccepted()
{
return fastPathAccepted == shards.size();
}
private boolean shouldSlowPathAccept()
{
return noOutstandingResponses == shards.size() && preAccepted == shards.size();
}
private boolean isPreAccepted()
{
return preacceptOutcome != null;
}
static CompletionStage<Agreed> agree(Node node, TxnId txnId, Txn txn)
{
return new Agree(node, txnId, txn);
}
}