blob: ccec87f91c5eb02493a7453350e9bd84771acbf7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.impl.basic;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import accord.api.MessageSink;
import accord.local.AgentExecutor;
import accord.burn.random.FrequentLargeRange;
import accord.messages.SafeCallback;
import accord.messages.Message;
import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.RandomSource;
import accord.local.Node;
import accord.local.Node.Id;
import accord.messages.Callback;
import accord.messages.Reply;
import accord.messages.Reply.FailureReply;
import accord.messages.ReplyContext;
import accord.messages.Request;
import static accord.impl.basic.Packet.SENTINEL_MESSAGE_ID;
import static java.util.concurrent.TimeUnit.SECONDS;
public class NodeSink implements MessageSink
{
public enum Action {DELIVER, DROP, DROP_PARTITIONED, DELIVER_WITH_FAILURE, FAILURE}
private final Map<Id, Supplier<Action>> nodeActions = new HashMap<>();
private final Map<Id, LongSupplier> networkJitter = new HashMap<>();
final Id self;
final Function<Id, Node> lookup;
final Cluster parent;
final RandomSource random;
int nextMessageId = 0;
Map<Long, SafeCallback> callbacks = new LinkedHashMap<>();
public NodeSink(Id self, Function<Id, Node> lookup, Cluster parent, RandomSource random)
{
this.self = self;
this.lookup = lookup;
this.parent = parent;
this.random = random;
}
@Override
public void send(Id to, Request send)
{
maybeEnqueue(to, SENTINEL_MESSAGE_ID, send, null);
}
@Override
public void send(Id to, Request send, AgentExecutor executor, Callback callback)
{
long messageId = nextMessageId++;
SafeCallback sc = new SafeCallback(executor, callback);
callbacks.put(messageId, sc);
if (maybeEnqueue(to, messageId, send, sc))
{
parent.pending.add((PendingRunnable) () -> {
if (sc == callbacks.get(messageId))
sc.slowResponse(to);
}, 100 + random.nextInt(200), TimeUnit.MILLISECONDS);
parent.pending.add((PendingRunnable) () -> {
if (sc == callbacks.remove(messageId))
sc.timeout(to);
}, 1000 + random.nextInt(1000), TimeUnit.MILLISECONDS);
}
}
@Override
public void reply(Id replyToNode, ReplyContext replyContext, Reply reply)
{
maybeEnqueue(replyToNode, Packet.getMessageId(replyContext), reply, null);
}
private boolean maybeEnqueue(Node.Id to, long id, Message message, SafeCallback callback)
{
Runnable task = () -> {
Packet packet;
if (message instanceof Reply) packet = new Packet(self, to, id, (Reply) message);
else packet = new Packet(self, to, id, (Request) message);
parent.add(packet, networkJitterNanos(to), TimeUnit.NANOSECONDS);
};
if (to.equals(self) || lookup.apply(to) == null /* client */)
{
parent.messageListener.onMessage(Action.DELIVER, self, to, id, message);
task.run();
return true;
}
Action action = partitioned(to) ? Action.DROP_PARTITIONED
// call actions() per node so each one has different "runs" state
: nodeActions.computeIfAbsent(to, ignore -> actions()).get();
parent.messageListener.onMessage(action, self, to, id, message);
switch (action)
{
case DELIVER:
task.run();
return true;
case DELIVER_WITH_FAILURE:
task.run();
case FAILURE:
if (action == Action.FAILURE)
parent.notifyDropped(self, to, id, message);
if (callback != null)
{
parent.pending.add((PendingRunnable) () -> {
if (callback == callbacks.remove(id))
{
try
{
callback.failure(to, new SimulatedFault("Simulation Failure; src=" + self + ", to=" + to + ", id=" + id + ", message=" + message));
}
catch (Throwable t)
{
callback.onCallbackFailure(to, t);
lookup.apply(self).agent().onUncaughtException(t);
}
}
}, 1000 + random.nextInt(1000), TimeUnit.MILLISECONDS);
}
return false;
case DROP_PARTITIONED:
case DROP:
// TODO (consistency): parent.notifyDropped is a trace logger that is very similar in spirit to MessageListener; can we unify?
parent.notifyDropped(self, to, id, message);
return true;
default:
throw new AssertionError("Unexpected action: " + action);
}
}
private long networkJitterNanos(Node.Id dst)
{
return networkJitter.computeIfAbsent(dst, ignore -> defaultJitter())
.getAsLong();
}
private LongSupplier defaultJitter()
{
return FrequentLargeRange.builder(random)
.ratio(1, 5)
.small(500, TimeUnit.MICROSECONDS, 5, TimeUnit.MILLISECONDS)
.large(50, TimeUnit.MILLISECONDS, 5, SECONDS)
.build()
.asLongSupplier(random);
}
private boolean partitioned(Id to)
{
return parent.partitionSet.contains(self) != parent.partitionSet.contains(to);
}
private Supplier<Action> actions()
{
Gen<Boolean> drops = Gens.bools().biasedRepeatingRuns(0.01, random.nextInt(3, 15));
Gen<Boolean> failures = Gens.bools().biasedRepeatingRuns(0.01, random.nextInt(3, 15));
Gen<Action> actionGen = rs -> {
if (drops.next(rs))
return Action.DROP;
return failures.next(rs) ?
rs.nextBoolean() ? Action.FAILURE : Action.DELIVER_WITH_FAILURE
: Action.DELIVER;
};
return actionGen.asSupplier(random);
}
@Override
public void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure)
{
reply(replyingToNode, replyContext, new FailureReply(failure));
}
}