blob: 0c9bdea849dec4e8a725a43c66ff24a84013117a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.coordinate.tracking;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.EnumMap;
import java.util.List;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import accord.burn.TopologyUpdates;
import accord.impl.IntHashKey;
import accord.impl.SizeOfIntersectionSorter;
import accord.impl.TestAgent;
import accord.impl.TopologyFactory;
import accord.impl.basic.RandomDelayQueue;
import accord.impl.basic.SimulatedDelayedExecutorService;
import accord.local.AgentExecutor;
import accord.local.Node.Id;
import accord.topology.Topologies;
import accord.topology.Topology;
import accord.topology.TopologyRandomizer;
import accord.utils.DefaultRandom;
import accord.utils.RandomSource;
public abstract class TrackerReconciler<ST extends ShardTracker, T extends AbstractTracker<ST>, E extends Enum<E>>
{
final RandomSource random;
final E[] events;
final EnumMap<E, Integer>[] counts;
final T tracker;
final List<Id> inflight;
protected TrackerReconciler(RandomSource random, Class<E> events, T tracker, List<Id> inflight)
{
this.random = random;
this.events = events.getEnumConstants();
this.tracker = tracker;
this.inflight = inflight;
this.counts = new EnumMap[tracker.trackers.length];
for (int i = 0 ; i < counts.length ; ++i)
{
counts[i] = new EnumMap<>(events);
for (E event : this.events)
counts[i].put(event, 0);
}
}
Topologies topologies()
{
return tracker.topologies;
}
void test()
{
while (true)
{
Assertions.assertFalse(inflight.isEmpty());
E next = events[random.nextInt(events.length)];
Id from = inflight.get(random.nextInt(inflight.size()));
RequestStatus newStatus = invoke(next, tracker, from);
for (int i = 0 ; i < topologies().size() ; ++i)
{
topologies().get(i).forEachOn(from, (s, si) -> {
counts[si].compute(next, (ignore, cur) -> cur + 1);
});
}
validate(newStatus);
if (newStatus != RequestStatus.NoChange)
break;
}
}
abstract RequestStatus invoke(E event, T tracker, Id from);
abstract void validate(RequestStatus status);
protected static <ST extends ShardTracker, T extends AbstractTracker<ST>, E extends Enum<E>>
List<TrackerReconciler<ST, T, E>> generate(long seed, BiFunction<RandomSource, Topologies, ? extends TrackerReconciler<ST, T, E>> constructor)
{
System.out.println("seed: " + seed);
RandomSource random = new DefaultRandom(seed);
SimulatedDelayedExecutorService executor = new SimulatedDelayedExecutorService(new RandomDelayQueue.Factory(random).get(), new TestAgent());
return topologies(random, executor).map(topologies -> constructor.apply(random, topologies))
.collect(Collectors.toList());
}
// TODO (required, testing): generalise and parameterise topology generation a bit more
// also, select a subset of the generated topologies to correctly simulate topology consumption logic
private static Stream<Topologies> topologies(RandomSource random, AgentExecutor executor)
{
TopologyFactory factory = new TopologyFactory(2 + random.nextInt(3), IntHashKey.ranges(4 + random.nextInt(12)));
List<Id> nodes = cluster( 1 + random.nextInt(factory.shardRanges.length));
Topology topology = factory.toTopology(nodes);
int count = 1 + random.nextInt(3);
List<Topologies> result = new ArrayList<>();
result.add(new Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology));
if (count == 1)
return result.stream();
Deque<Topology> topologies = new ArrayDeque<>();
topologies.add(topology);
TopologyUpdates topologyUpdates = new TopologyUpdates(executor);
TopologyRandomizer configRandomizer = new TopologyRandomizer(() -> random, topology, topologyUpdates, null);
while (--count > 0)
{
Topology next = configRandomizer.updateTopology();
while (next == null)
next = configRandomizer.updateTopology();
topologies.addFirst(next);
result.add(new Topologies.Multi(SizeOfIntersectionSorter.SUPPLIER, topologies.toArray(new Topology[0])));
}
return result.stream();
}
private static List<Id> cluster(int count)
{
List<Id> cluster = new ArrayList<>();
for (int i = 1 ; i <= count ; ++i)
cluster.add(new Id(i));
return cluster;
}
}