blob: d58222c2ad1835da95fb5c6596b42585369436ca [file] [log] [blame]
package accord.maelstrom;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import accord.local.Node.Id;
import accord.maelstrom.Cluster.Queue;
import accord.maelstrom.Cluster.QueueSupplier;
public class Runner
static class StandardQueue<T> implements Queue<T>
static class Factory implements QueueSupplier
final Random seeds;
Factory(Random seeds)
this.seeds = seeds;
public <T> Queue<T> get()
return new StandardQueue<>(new Random(seeds.nextLong()));
static class Item<T> implements Comparable<Item<T>>
final long time;
final int seq;
final T item;
Item(long time, int seq, T item)
this.time = time;
this.seq = seq;
this.item = item;
public int compareTo(Item<T> that)
int c =, that.time);
if (c == 0) c =, that.seq);
return c;
final PriorityQueue<Item<T>> queue = new PriorityQueue<>();
final Random random;
long now;
int seq;
StandardQueue(Random random)
this.random = random;
public void add(T item)
add(item, random.nextInt(500), TimeUnit.MILLISECONDS);
public void add(T item, long delay, TimeUnit units)
queue.add(new Item<>(now + units.toMillis(delay), seq++, item));
public T poll()
Item<T> item = queue.poll();
if (item == null)
return null;
now = item.time;
return item.item;
public int size()
return queue.size();
static class RandomQueue<T> implements Queue<T>
static class Factory implements QueueSupplier
final Random seeds;
Factory(Random seeds)
this.seeds = seeds;
public <T> Queue<T> get()
return new RandomQueue<>(new Random(seeds.nextLong()));
static class Entry<T> implements Comparable<Entry<T>>
final double priority;
final T value;
Entry(double priority, T value)
this.priority = priority;
this.value = value;
public int compareTo(Entry<T> that)
return, that.priority);
final PriorityQueue<Entry<T>> queue = new PriorityQueue<>();
final Random random;
public RandomQueue(Random random)
this.random = random;
public int size()
return queue.size();
public void add(T item)
queue.add(new Entry<>(random.nextDouble(), item));
public void add(T item, long delay, TimeUnit units)
queue.add(new Entry<>(random.nextDouble(), item));
public T poll()
return unwrap(queue.poll());
private static <T> T unwrap(Entry<T> e)
return e == null ? null : e.value;
static <T> Supplier<T> parseOutput(boolean delay, String output, Function<String, T> parse)
return parseOutput(delay, output.split("\n"), parse);
static <T> Supplier<T> parseOutput(boolean delay, BufferedReader output, Function<String, T> parse)
return parseOutput(delay, output.lines().toArray(String[]::new), parse);
static <T> Supplier<T> parseOutput(boolean delay, String[] output, Function<String, T> parse)
long[] nanos = new long[output.length];
String[] commands = new String[output.length];
for (int i = 0 ; i < output.length ; ++i)
String command = output[i];
long at = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(command.substring(0, command.indexOf(' '))));
command = command.substring(command.indexOf(' ') + 1);
if (i > 0 && at <= nanos[i-1]) at = nanos[i-1] + 1;
nanos[i] = at;
commands[i] = command;
long start = System.nanoTime();
return new Supplier<>()
int i = 0;
public T get()
if (i == commands.length)
return null;
while (delay)
long wait = start + nanos[i] - System.nanoTime();
if (wait <= 0) break;
catch (InterruptedException e)
throw new IllegalStateException(e);
return parse.apply(commands[i++]);
static void parseNode(TopologyFactory factory, boolean delay, String output) throws IOException
Main.listen(factory, parseOutput(delay, output, Function.identity()), System.out, System.err);
// TODO: we need to align response ids with the input; for now replies are broken
static void replay(int nodeCount, TopologyFactory factory, boolean delay, Supplier<Packet> input) throws IOException
run(nodeCount, new QueueSupplier()
public <T> Queue<T> get()
return new Queue<>()
public void add(T t)
public void add(T item, long delay, TimeUnit units)
public T poll()
return (T)input.get();
public int size()
return 0;
}, Random::new, factory, () -> null);
static void run(TopologyFactory factory, String ... commands) throws IOException
run(3, factory, commands);
static void run(int nodeCount, TopologyFactory factory, String ... commands) throws IOException
run(nodeCount, new StandardQueue.Factory(new Random()), Random::new, factory, commands);
static void run(int nodeCount, QueueSupplier queueSupplier, Supplier<Random> randomSupplier, TopologyFactory factory, String ... commands) throws IOException
run(nodeCount, queueSupplier, randomSupplier, factory, new Supplier<>()
int i = 0;
public Packet get()
return i == commands.length ? null : Packet.parse(commands[i++]);
static void run(int nodeCount, QueueSupplier queueSupplier, Supplier<Random> randomSupplier, TopologyFactory factory, Supplier<Packet> commands) throws IOException
List<Id> nodes = new ArrayList<>();
for (int i = 1 ; i <= nodeCount ; ++i)
nodes.add(new Id(i));[]::new), queueSupplier, ignore -> {}, randomSupplier, () -> new AtomicLong()::incrementAndGet, factory, commands, System.err);