blob: b949c5ab53d58149619c135db21c105da423ff79 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.impl.basic;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.MessageSink;
import accord.api.Scheduler;
import accord.burn.BurnTestConfigurationService;
import accord.burn.TopologyUpdates;
import accord.burn.random.FrequentLargeRange;
import accord.config.LocalConfig;
import accord.config.MutableLocalConfig;
import accord.coordinate.CoordinationAdapter;
import accord.impl.CoordinateDurabilityScheduling;
import accord.impl.MessageListener;
import accord.impl.PrefixedIntHashKey;
import accord.impl.SimpleProgressLog;
import accord.impl.SizeOfIntersectionSorter;
import accord.impl.TopologyFactory;
import accord.impl.list.ListStore;
import accord.local.AgentExecutor;
import accord.local.Node.Id;
import accord.local.Node;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
import accord.messages.LocalRequest;
import accord.messages.Message;
import accord.messages.MessageType;
import accord.messages.Reply;
import accord.messages.Request;
import accord.messages.SafeCallback;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.topology.Topology;
import accord.topology.TopologyRandomizer;
import accord.utils.RandomSource;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import static accord.impl.basic.Cluster.OverrideLinksKind.NONE;
import static accord.impl.basic.Cluster.OverrideLinksKind.RANDOM_BIDIRECTIONAL;
import static accord.impl.basic.NodeSink.Action.DELIVER;
import static accord.impl.basic.NodeSink.Action.DROP;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
public class Cluster implements Scheduler
{
public static final Logger trace = LoggerFactory.getLogger("accord.impl.basic.Trace");
public static class Stats
{
int count;
public int count() { return count; }
public String toString() { return Integer.toString(count); }
}
public static class LinkConfig
{
final Function<List<Id>, BiFunction<Id, Id, Link>> overrideLinks;
final BiFunction<Id, Id, Link> defaultLinks;
public LinkConfig(Function<List<Id>, BiFunction<Id, Id, Link>> overrideLinks, BiFunction<Id, Id, Link> defaultLinks)
{
this.overrideLinks = overrideLinks;
this.defaultLinks = defaultLinks;
}
}
static class Link
{
final Supplier<NodeSink.Action> action;
final LongSupplier latencyMicros;
Link(Supplier<NodeSink.Action> action, LongSupplier latencyMicros)
{
this.action = action;
this.latencyMicros = latencyMicros;
}
}
final Map<MessageType, Stats> statsMap = new HashMap<>();
final RandomSource random;
final LinkConfig linkConfig;
final Function<Id, Node> lookup;
final PendingQueue pending;
final Runnable checkFailures;
final List<Runnable> onDone = new ArrayList<>();
final Consumer<Packet> responseSink;
final Map<Id, NodeSink> sinks = new HashMap<>();
final MessageListener messageListener;
int clock;
int recurring;
BiFunction<Id, Id, Link> links;
public Cluster(RandomSource random, MessageListener messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Function<Id, Node> lookup, IntSupplier rf, Consumer<Packet> responseSink)
{
this.random = random;
this.messageListener = messageListener;
this.pending = queueSupplier.get();
this.checkFailures = checkFailures;
this.lookup = lookup;
this.responseSink = responseSink;
this.linkConfig = defaultLinkConfig(random, rf);
this.links = linkConfig.defaultLinks;
}
NodeSink create(Id self, RandomSource random)
{
NodeSink sink = new NodeSink(self, lookup, this, random);
sinks.put(self, sink);
return sink;
}
void add(Packet packet, long delay, TimeUnit unit)
{
MessageType type = packet.message.type();
if (type != null)
statsMap.computeIfAbsent(type, ignore -> new Stats()).count++;
if (trace.isTraceEnabled())
trace.trace("{} {} {}", clock++, packet.message instanceof Reply ? "RPLY" : "SEND", packet);
if (lookup.apply(packet.dst) == null) responseSink.accept(packet);
else pending.add(packet, delay, unit);
}
public void processAll()
{
List<Object> pending = new ArrayList<>();
while (this.pending.size() > 0)
pending.add(this.pending.poll());
for (Object next : pending)
processNext(next);
}
public boolean processPending()
{
checkFailures.run();
if (pending.size() == recurring)
return false;
Object next = pending.poll();
if (next == null)
return false;
processNext(next);
checkFailures.run();
return true;
}
private void processNext(Object next)
{
if (next instanceof Packet)
{
Packet deliver = (Packet) next;
Node on = lookup.apply(deliver.dst);
if (trace.isTraceEnabled())
trace.trace("{} RECV[{}] {}", clock++, on.epoch(), deliver);
if (deliver.message instanceof Reply)
{
Reply reply = (Reply) deliver.message;
SafeCallback callback = reply.isFinal()
? sinks.get(deliver.dst).callbacks.remove(deliver.replyId)
: sinks.get(deliver.dst).callbacks.get(deliver.replyId);
if (callback != null)
{
if (reply instanceof Reply.FailureReply) callback.failure(deliver.src, ((Reply.FailureReply) reply).failure);
else callback.success(deliver.src, reply);
}
}
else on.receive((Request) deliver.message, deliver.src, deliver);
}
else
{
((Runnable) next).run();
}
}
public void notifyDropped(Node.Id from, Node.Id to, long id, Message message)
{
if (trace.isTraceEnabled())
trace.trace("{} DROP[{}] (from:{}, to:{}, {}:{}, body:{})", clock++, lookup.apply(to).epoch(), from, to, message instanceof Reply ? "replyTo" : "id", id, message);
}
@Override
public Scheduled recurring(Runnable run, long delay, TimeUnit units)
{
RecurringPendingRunnable result = new RecurringPendingRunnable(pending, run, delay, units);
++recurring;
result.onCancellation(() -> --recurring);
pending.add(result, delay, units);
return result;
}
@Override
public Scheduled once(Runnable run, long delay, TimeUnit units)
{
RecurringPendingRunnable result = new RecurringPendingRunnable(null, run, delay, units);
pending.add(result, delay, units);
return result;
}
public void onDone(Runnable run)
{
onDone.add(run);
}
@Override
public void now(Runnable run)
{
run.run();
}
public static Map<MessageType, Stats> run(Id[] nodes, MessageListener messageListener, Supplier<PendingQueue> queueSupplier,
BiFunction<Id, BiConsumer<Timestamp, Ranges>, AgentExecutor> nodeExecutorSupplier,
Runnable checkFailures, Consumer<Packet> responseSink,
Supplier<RandomSource> randomSupplier, Supplier<LongSupplier> nowSupplierSupplier,
TopologyFactory topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal,
Consumer<Map<Id, Node>> readySignal)
{
Topology topology = topologyFactory.toTopology(nodes);
Map<Id, Node> nodeMap = new LinkedHashMap<>();
Map<Id, AgentExecutor> executorMap = new LinkedHashMap<>();
try
{
RandomSource random = randomSupplier.get();
Cluster sinks = new Cluster(randomSupplier.get(), messageListener, queueSupplier, checkFailures, nodeMap::get, () -> topologyFactory.rf, responseSink);
TopologyUpdates topologyUpdates = new TopologyUpdates(executorMap::get);
TopologyRandomizer.Listener schemaApply = t -> {
for (Node node : nodeMap.values())
{
ListStore store = (ListStore) node.commandStores().dataStore();
store.onTopologyUpdate(node, t);
}
};
TopologyRandomizer configRandomizer = new TopologyRandomizer(randomSupplier, topology, topologyUpdates, nodeMap::get, schemaApply);
List<CoordinateDurabilityScheduling> durabilityScheduling = new ArrayList<>();
for (Id id : nodes)
{
MessageSink messageSink = sinks.create(id, randomSupplier.get());
LongSupplier nowSupplier = nowSupplierSupplier.get();
LocalConfig localConfig = new MutableLocalConfig();
BiConsumer<Timestamp, Ranges> onStale = (sinceAtLeast, ranges) -> configRandomizer.onStale(id, sinceAtLeast, ranges);
AgentExecutor nodeExecutor = nodeExecutorSupplier.apply(id, onStale);
executorMap.put(id, nodeExecutor);
BurnTestConfigurationService configService = new BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology, nodeMap::get, topologyUpdates);
BooleanSupplier isLoadedCheck = random.biasedUniformBools(0.5f);
Node node = new Node(id, messageSink, LocalRequest::process, configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier),
() -> new ListStore(id), new ShardDistributor.EvenSplit<>(8, ignore -> new PrefixedIntHashKey.Splitter()),
nodeExecutor.agent(),
randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new, DelayedCommandStores.factory(sinks.pending, isLoadedCheck), new CoordinationAdapter.DefaultFactory(),
localConfig);
CoordinateDurabilityScheduling durability = new CoordinateDurabilityScheduling(node);
// TODO (desired): randomise
durability.setFrequency(60, SECONDS);
durability.setGlobalCycleTime(180, SECONDS);
durabilityScheduling.add(durability);
nodeMap.put(id, node);
durabilityScheduling.add(new CoordinateDurabilityScheduling(node));
}
Runnable updateDurabilityRate;
{
IntSupplier frequencySeconds = random.biasedUniformIntsSupplier( 1, 120, 10, 30, 10, 60).get();
IntSupplier shardCycleTimeSeconds = random.biasedUniformIntsSupplier(5, 60, 10, 30, 1, 30).get();
IntSupplier globalCycleTimeSeconds = random.biasedUniformIntsSupplier( 1, 90, 10, 30, 10, 60).get();
updateDurabilityRate = () -> {
int f = frequencySeconds.getAsInt();
int s = shardCycleTimeSeconds.getAsInt();
int g = globalCycleTimeSeconds.getAsInt();
durabilityScheduling.forEach(d -> {
d.setFrequency(f, SECONDS);
d.setShardCycleTime(s, SECONDS);
d.setGlobalCycleTime(g, SECONDS);
});
};
}
updateDurabilityRate.run();
schemaApply.onUpdate(topology);
// startup
AsyncResult<?> startup = AsyncChains.reduce(nodeMap.values().stream().map(Node::unsafeStart).collect(toList()), (a, b) -> null).beginAsResult();
while (sinks.processPending());
Assertions.assertTrue(startup.isDone());
List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
Scheduled chaos = sinks.recurring(() -> {
sinks.links = sinks.linkConfig.overrideLinks.apply(nodesList);
if (random.decide(0.1f))
updateDurabilityRate.run();
}, 5L, SECONDS);
Scheduled reconfigure = sinks.recurring(configRandomizer::maybeUpdateTopology, 1, SECONDS);
durabilityScheduling.forEach(CoordinateDurabilityScheduling::start);
noMoreWorkSignal.accept(() -> {
reconfigure.cancel();
durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop);
});
readySignal.accept(nodeMap);
Packet next;
while ((next = in.get()) != null)
sinks.add(next, 0, TimeUnit.NANOSECONDS);
while (sinks.processPending());
chaos.cancel();
reconfigure.cancel();
durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop);
sinks.links = sinks.linkConfig.defaultLinks;
// give progress log et al a chance to finish
// TODO (desired, testing): would be nice to make this more certain than an arbitrary number of additional rounds
for (int i = 0 ; i < 10 ; ++i)
{
sinks.processAll();
while (sinks.processPending());
}
while (!sinks.onDone.isEmpty())
{
List<Runnable> onDone = new ArrayList<>(sinks.onDone);
sinks.onDone.clear();
onDone.forEach(Runnable::run);
while (sinks.processPending());
}
return sinks.statsMap;
}
finally
{
nodeMap.values().forEach(Node::shutdown);
}
}
private static BiFunction<Id, Id, Link> partition(List<Id> nodes, RandomSource random, int rf, BiFunction<Id, Id, Link> up)
{
Collections.shuffle(nodes, random.asJdkRandom());
int partitionSize = random.nextInt((rf+1)/2);
Set<Id> partition = new LinkedHashSet<>(nodes.subList(0, partitionSize));
BiFunction<Id, Id, Link> down = (from, to) -> new Link(() -> DROP, up.apply(from, to).latencyMicros);
return (from, to) -> (partition.contains(from) == partition.contains(to) ? up : down).apply(from, to);
}
/**
* pair every node with one other node in one direction with a network behaviour override
*/
private static BiFunction<Id, Id, Link> pairedUnidirectionalOverrides(Function<Link, Link> linkOverride, List<Id> nodes, RandomSource random, BiFunction<Id, Id, Link> fallback)
{
Map<Id, Map<Id, Link>> map = new HashMap<>();
Collections.shuffle(nodes, random.asJdkRandom());
for (int i = 0 ; i + 1 < nodes.size() ; i += 2)
{
Id from = nodes.get(i);
Id to = nodes.get(i + 1);
Link link = linkOverride.apply(fallback.apply(from, to));
map.put(from, singletonMap(to, link));
}
return (from, to) -> nonNullOrGet(map.getOrDefault(from, emptyMap()).get(to), from, to, fallback);
}
private static BiFunction<Id, Id, Link> randomOverrides(boolean bidirectional, Function<Link, Link> linkOverride, int count, List<Id> nodes, RandomSource random, BiFunction<Id, Id, Link> fallback)
{
Map<Id, Map<Id, Link>> map = new HashMap<>();
while (count > 0)
{
Id from = nodes.get(random.nextInt(nodes.size()));
Id to = nodes.get(random.nextInt(nodes.size()));
Link fwd = linkOverride.apply(fallback.apply(from, to));
if (null == map.computeIfAbsent(from, ignore -> new HashMap<>()).putIfAbsent(to, fwd))
{
if (bidirectional)
{
Link rev = linkOverride.apply(fallback.apply(to, from));
map.computeIfAbsent(to, ignore -> new HashMap<>()).put(from, rev);
}
--count;
}
}
return (from, to) -> nonNullOrGet(map.getOrDefault(from, emptyMap()).get(to), from, to, fallback);
}
private static Link nonNullOrGet(Link ifNotNull, Id from, Id to, BiFunction<Id, Id, Link> function)
{
if (ifNotNull != null)
return ifNotNull;
return function.apply(from, to);
}
private static Link healthy(LongSupplier latency)
{
return new Link(() -> DELIVER, latency);
}
private static Link down(LongSupplier latency)
{
return new Link(() -> DROP, latency);
}
private LongSupplier defaultRandomWalkLatencyMicros(RandomSource random)
{
LongSupplier range = FrequentLargeRange.builder(random)
.ratio(1, 5)
.small(500, TimeUnit.MICROSECONDS, 5, MILLISECONDS)
.large(50, MILLISECONDS, 5, SECONDS)
.build().asLongSupplier(random);
return () -> NANOSECONDS.toMicros(range.getAsLong());
}
enum OverrideLinkKind { LATENCY, ACTION, BOTH }
private Supplier<Function<Link, Link>> linkOverrideSupplier(RandomSource random)
{
Supplier<OverrideLinkKind> nextKind = random.randomWeightedPicker(OverrideLinkKind.values());
Supplier<LongSupplier> latencySupplier = random.biasedUniformLongsSupplier(
MILLISECONDS.toMicros(1L), SECONDS.toMicros(2L),
MILLISECONDS.toMicros(1L), MILLISECONDS.toMicros(300L), SECONDS.toMicros(1L),
MILLISECONDS.toMicros(1L), MILLISECONDS.toMicros(300L), SECONDS.toMicros(1L)
);
NodeSink.Action[] actions = NodeSink.Action.values();
Supplier<Supplier<NodeSink.Action>> actionSupplier = () -> random.randomWeightedPicker(actions);
return () -> {
OverrideLinkKind kind = nextKind.get();
switch (kind)
{
default: throw new AssertionError("Unhandled: " + kind);
case BOTH: return ignore -> new Link(actionSupplier.get(), latencySupplier.get());
case ACTION: return override -> new Link(actionSupplier.get(), override.latencyMicros);
case LATENCY: return override -> new Link(override.action, latencySupplier.get());
}
};
}
enum OverrideLinksKind { NONE, PAIRED_UNIDIRECTIONAL, RANDOM_UNIDIRECTIONAL, RANDOM_BIDIRECTIONAL }
private Function<List<Id>, BiFunction<Id, Id, Link>> overrideLinks(RandomSource random, IntSupplier rf, BiFunction<Id, Id, Link> defaultLinks)
{
Supplier<Function<Link, Link>> linkOverrideSupplier = linkOverrideSupplier(random);
BooleanSupplier partitionChance = random.biasedUniformBools(random.nextFloat());
Supplier<OverrideLinksKind> nextKind = random.randomWeightedPicker(OverrideLinksKind.values());
return nodesList -> {
BiFunction<Id, Id, Link> links = defaultLinks;
if (partitionChance.getAsBoolean()) // 50% chance of a whole network partition
links = partition(nodesList, random, rf.getAsInt(), links);
OverrideLinksKind kind = nextKind.get();
if (kind == NONE)
return links;
Function<Link, Link> linkOverride = linkOverrideSupplier.get();
switch (kind)
{
default: throw new AssertionError("Unhandled: " + kind);
case PAIRED_UNIDIRECTIONAL:
return pairedUnidirectionalOverrides(linkOverride, nodesList, random, defaultLinks);
case RANDOM_BIDIRECTIONAL:
case RANDOM_UNIDIRECTIONAL:
boolean bidirectional = kind == RANDOM_BIDIRECTIONAL;
int count = random.nextInt(bidirectional || random.nextBoolean() ? nodesList.size() : Math.max(1, (nodesList.size() * nodesList.size())/2));
return randomOverrides(bidirectional, linkOverride, count, nodesList, random, defaultLinks);
}
};
}
private BiFunction<Id, Id, Link> defaultLinks(RandomSource random)
{
return caching((from, to) -> healthy(defaultRandomWalkLatencyMicros(random)));
}
private BiFunction<Id, Id, Link> caching(BiFunction<Id, Id, Link> uncached)
{
Map<Id, Map<Id, Link>> stash = new HashMap<>();
return (from, to) -> stash.computeIfAbsent(from, ignore -> new HashMap<>())
.computeIfAbsent(to, ignore -> uncached.apply(from, to));
}
private LinkConfig defaultLinkConfig(RandomSource random, IntSupplier rf)
{
BiFunction<Id, Id, Link> defaultLinks = defaultLinks(random);
Function<List<Id>, BiFunction<Id, Id, Link>> overrideLinks = overrideLinks(random, rf, defaultLinks);
return new LinkConfig(overrideLinks, defaultLinks);
}
}