blob: 45f9f308f018cd898470d83f18444846fa0bbffc [file] [log] [blame]
package accord.messages;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import accord.local.Instance;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Data;
import accord.messages.Reply;
import accord.messages.Request;
import accord.local.Command;
import accord.local.Listener;
import accord.local.Status;
import accord.txn.Txn;
import accord.txn.TxnId;
import accord.txn.Timestamp;
import accord.api.Scheduler.Scheduled;
import accord.utils.DeterministicIdentitySet;
public class ReadData implements Request
{
static class LocalRead implements Listener
{
final TxnId txnId;
final Node node;
final Node.Id replyToNode;
final long replyToMessage;
Data data;
boolean isObsolete; // TODO: respond with the Executed result we have stored?
Set<Instance> waitingOn;
Scheduled waitingOnReporter;
LocalRead(TxnId txnId, Node node, Id replyToNode, long replyToMessage)
{
this.txnId = txnId;
this.node = node;
this.replyToNode = replyToNode;
this.replyToMessage = replyToMessage;
// TODO: this is messy, we want a complete separate liveness mechanism that ensures progress for all transactions
this.waitingOnReporter = node.scheduler().once(new ReportWaiting(), 1L, TimeUnit.SECONDS);
}
class ReportWaiting implements Listener, Runnable
{
@Override
public void onChange(Command command)
{
command.removeListener(this);
run();
}
@Override
public void run()
{
Iterator<Instance> i = waitingOn.iterator();
Command blockedBy = null;
while (i.hasNext() && null == (blockedBy = i.next().command(txnId).blockedBy()));
if (blockedBy == null) return;
blockedBy.addListener(this);
assert blockedBy.status().compareTo(Status.NotWitnessed) > 0;
node.reply(replyToNode, replyToMessage, new ReadWaiting(blockedBy.txnId(), blockedBy.txn(), blockedBy.executeAt(), blockedBy.status()));
}
}
@Override
public synchronized void onChange(Command command)
{
switch (command.status())
{
case NotWitnessed:
case PreAccepted:
case Accepted:
case Committed:
return;
case Executed:
case Applied:
obsolete(command);
case ReadyToExecute:
}
command.removeListener(this);
if (!isObsolete)
read(command);
}
private void read(Command command)
{
// TODO: threading/futures (don't want to perform expensive reads within this mutually exclusive context)
Data next = command.txn().read(command);
data = data == null ? next : data.merge(next);
waitingOn.remove(command.instance);
if (waitingOn.isEmpty())
{
waitingOnReporter.cancel();
node.reply(replyToNode, replyToMessage, new ReadOk(data));
}
}
void obsolete(Command command)
{
if (!isObsolete)
{
isObsolete = true;
waitingOnReporter.cancel();
node.send(command.instance.shard, new Apply(command.txnId(), command.txn(), command.executeAt(), command.savedDeps(), command.writes(), command.result()));
node.reply(replyToNode, replyToMessage, new ReadNack());
}
}
synchronized void setup(TxnId txnId, Txn txn)
{
// TODO: simple hash set supporting concurrent modification, or else avoid concurrent modification
waitingOn = txn.local(node).collect(Collectors.toCollection(() -> new DeterministicIdentitySet<>()));
waitingOn.forEach(instance -> {
Command command = instance.command(txnId);
command.witness(txn);
switch (command.status())
{
case NotWitnessed:
throw new IllegalStateException();
case PreAccepted:
case Accepted:
case Committed:
command.addListener(this);
break;
case Executed:
case Applied:
obsolete(command);
break;
case ReadyToExecute:
if (!isObsolete)
read(command);
}
});
}
}
final TxnId txnId;
final Txn txn;
public ReadData(TxnId txnId, Txn txn)
{
this.txnId = txnId;
this.txn = txn;
}
public void process(Node node, Node.Id from, long messageId)
{
new LocalRead(txnId, node, from, messageId).setup(txnId, txn);
}
public static class ReadReply implements Reply
{
public boolean isOK()
{
return true;
}
}
public static class ReadNack extends ReadReply
{
@Override
public boolean isOK()
{
return false;
}
}
public static class ReadOk extends ReadReply
{
public final Data data;
public ReadOk(Data data)
{
this.data = data;
}
@Override
public String toString()
{
return "ReadOk{" + data + '}';
}
}
public static class ReadWaiting extends ReadReply
{
public final TxnId txnId;
public final Txn txn;
public final Timestamp executeAt;
public final Status status;
public ReadWaiting(TxnId txnId, Txn txn, Timestamp executeAt, Status status)
{
this.txnId = txnId;
this.txn = txn;
this.executeAt = executeAt;
this.status = status;
}
@Override
public boolean isFinal()
{
return false;
}
@Override
public String toString()
{
return "ReadWaiting{" +
"txnId:" + txnId +
", txn:" + txn +
", executeAt:" + executeAt +
", status:" + status +
'}';
}
}
@Override
public String toString()
{
return "ReadData{" +
"txnId:" + txnId +
", txn:" + txn +
'}';
}
}