blob: f48877eb71a7548059da18af73400a31d6dad882 [file] [log] [blame]
package accord.impl.basic;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import accord.coordinate.Timeout;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.MessageSink;
import accord.messages.Callback;
import accord.messages.Reply;
import accord.messages.Request;
import static accord.impl.basic.Packet.SENTINEL_MESSAGE_ID;
public class NodeSink implements MessageSink
{
final Id self;
final Function<Id, Node> lookup;
final Cluster parent;
final Random random;
int nextMessageId = 0;
Map<Long, Callback> callbacks = new LinkedHashMap<>();
public NodeSink(Id self, Function<Id, Node> lookup, Cluster parent, Random random)
{
this.self = self;
this.lookup = lookup;
this.parent = parent;
this.random = random;
}
@Override
public synchronized void send(Id to, Request send)
{
parent.add(self, to, SENTINEL_MESSAGE_ID, send);
}
@Override
public void send(Id to, Request send, Callback callback)
{
long messageId = nextMessageId++;
callbacks.put(messageId, callback);
parent.add(self, to, messageId, send);
parent.pending.add((PendingRunnable) () -> {
if (callback == callbacks.remove(messageId))
callback.onFailure(to, new Timeout());
}, 1000 + random.nextInt(10000), TimeUnit.MILLISECONDS);
}
@Override
public void reply(Id replyToNode, long replyToMessage, Reply reply)
{
parent.add(self, replyToNode, replyToMessage, reply);
}
}