blob: eb79af70b37b5cb18a1726fba527eb5df1ca3f05 [file] [log] [blame]
package accord.coordinate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import accord.messages.Preempted;
import accord.txn.Ballot;
import accord.messages.Callback;
import accord.local.Node;
import accord.local.Node.Id;
import accord.topology.Shards;
import accord.txn.Timestamp;
import accord.txn.Dependencies;
import accord.txn.Txn;
import accord.txn.TxnId;
import accord.messages.Accept;
import accord.messages.Accept.AcceptOk;
import accord.messages.Accept.AcceptReply;
class AcceptPhase extends CompletableFuture<Agreed>
{
final Node node;
final Ballot ballot;
final TxnId txnId;
final Txn txn;
final Shards shards;
private List<AcceptOk> acceptOks;
private Timestamp proposed;
private int[] accepts;
private int[] failures;
private int acceptQuorums;
AcceptPhase(Node node, Ballot ballot, TxnId txnId, Txn txn, Shards shards)
{
this.node = node;
this.ballot = ballot;
this.txnId = txnId;
this.txn = txn;
this.shards = shards;
}
protected void startAccept(Timestamp executeAt, Dependencies deps)
{
this.proposed = executeAt;
this.acceptOks = new ArrayList<>();
this.accepts = new int[shards.size()];
this.failures = new int[shards.size()];
node.send(shards, new Accept(ballot, txnId, txn, executeAt, deps), new Callback<AcceptReply>()
{
@Override
public void onSuccess(Id from, AcceptReply response)
{
onAccept(from, response);
}
@Override
public void onFailure(Id from, Throwable throwable)
{
shards.forEachOn(from, (i, shard) -> {
if (++failures[i] >= shard.slowPathQuorumSize)
completeExceptionally(new accord.messages.Timeout());
});
}
});
}
private void onAccept(Id from, AcceptReply reply)
{
if (isDone())
return;
if (!reply.isOK())
{
completeExceptionally(new Preempted());
return;
}
AcceptOk ok = (AcceptOk) reply;
acceptOks.add(ok);
shards.forEachOn(from, txn.keys(), (i, shard) -> {
if (++accepts[i] == shard.slowPathQuorumSize)
++acceptQuorums;
});
if (acceptQuorums == shards.size())
onAccepted();
}
private void onAccepted()
{
Dependencies deps = new Dependencies();
for (AcceptOk acceptOk : acceptOks)
deps.addAll(acceptOk.deps);
agreed(proposed, deps);
}
protected void agreed(Timestamp executeAt, Dependencies deps)
{
complete(new Agreed(txnId, txn, executeAt, deps, shards, null, null));
}
}