blob: f1da841527ff3f5c960ea051362b1846b2a26834 [file] [log] [blame]
package accord.coordinate;
import accord.local.Node;
import accord.local.Node.Id;
import accord.messages.CheckStatus;
import accord.messages.CheckStatus.CheckStatusOk;
import accord.messages.CheckStatus.CheckStatusReply;
import accord.messages.CheckStatus.IncludeInfo;
import accord.primitives.*;
import accord.topology.Topologies;
/**
* A result of null indicates the transaction is globally persistent
* A result of CheckStatusOk indicates the maximum status found for the transaction, which may be used to assess progress
*/
public abstract class CheckShards extends ReadCoordinator<CheckStatusReply>
{
final Unseekables<?, ?> contact;
/**
* The epoch until which we want to fetch data from remotely
* TODO (required, consider): configure the epoch we want to start with
*/
final long untilRemoteEpoch;
final IncludeInfo includeInfo;
protected CheckStatusOk merged;
protected CheckShards(Node node, TxnId txnId, Unseekables<?, ?> contact, long srcEpoch, IncludeInfo includeInfo)
{
super(node, topologyFor(node, txnId, contact, srcEpoch), txnId);
this.untilRemoteEpoch = srcEpoch;
this.contact = contact;
this.includeInfo = includeInfo;
}
private static Topologies topologyFor(Node node, TxnId txnId, Unseekables<?, ?> contact, long epoch)
{
return node.topology().preciseEpochs(contact, txnId.epoch(), epoch);
}
@Override
protected void contact(Id id)
{
Unseekables<?, ?> unseekables = contact.slice(topologies().computeRangesForNode(id));
node.send(id, new CheckStatus(txnId, unseekables, txnId.epoch(), untilRemoteEpoch, includeInfo), this);
}
protected boolean isSufficient(Id from, CheckStatusOk ok) { return isSufficient(ok); }
protected abstract boolean isSufficient(CheckStatusOk ok);
protected Action checkSufficient(Id from, CheckStatusOk ok)
{
if (isSufficient(from, ok))
return Action.Approve;
return Action.ApproveIfQuorum;
}
@Override
protected Action process(Id from, CheckStatusReply reply)
{
if (debug != null)
debug.put(from, reply);
if (reply.isOk())
{
CheckStatusOk ok = (CheckStatusOk) reply;
if (merged == null) merged = ok;
else merged = merged.merge(ok);
return checkSufficient(from, ok);
}
else
{
onFailure(from, new IllegalStateException("Submitted command to a replica that did not own the range"));
return Action.Abort;
}
}
}