blob: 1df8f9dc6675e75006915b6572c63153e2c73efe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.maelstrom;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import accord.coordinate.Timeout;
import accord.impl.SimpleProgressLog;
import accord.impl.InMemoryCommandStores;
import accord.impl.SizeOfIntersectionSorter;
import accord.local.AgentExecutor;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Scheduler;
import accord.local.ShardDistributor;
import accord.messages.ReplyContext;
import accord.topology.Topology;
import accord.utils.DefaultRandom;
import accord.utils.ThreadPoolScheduler;
import accord.maelstrom.Packet.Type;
import accord.api.MessageSink;
import accord.messages.Callback;
import accord.messages.Reply;
import accord.messages.Request;
public class Main
{
static class CallbackInfo
{
final Callback callback;
final Id to;
final long timeout;
CallbackInfo(Callback callback, Id to, long timeout)
{
this.callback = callback;
this.to = to;
this.timeout = timeout;
}
}
public static class StdoutSink implements MessageSink
{
private final AtomicLong nextMessageId = new AtomicLong(1);
private final Map<Long, CallbackInfo> callbacks = new ConcurrentHashMap<>();
final LongSupplier nowSupplier;
final Scheduler scheduler;
final long start;
final Id self;
final PrintStream out, err;
public StdoutSink(LongSupplier nowSupplier, Scheduler scheduler, long start, Id self, PrintStream stdout, PrintStream stderr)
{
this.nowSupplier = nowSupplier;
this.scheduler = scheduler;
this.start = start;
this.self = self;
this.out = stdout;
this.err = stderr;
this.scheduler.recurring(() -> {
long now = nowSupplier.getAsLong();
callbacks.forEach((messageId, info) -> {
if (info.timeout < now && callbacks.remove(messageId, info))
info.callback.onFailure(info.to, new Timeout(null, null));
});
}, 1L, TimeUnit.SECONDS);
}
private void send(Packet packet)
{
err.println("Sending " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + packet);
err.flush();
out.println(packet);
out.flush();
}
public synchronized void send(Id to, Body body)
{
send(new Packet(self, to, body));
}
@Override
public synchronized void send(Id to, Request send)
{
send(new Packet(self, to, Body.SENTINEL_MSG_ID, send));
}
@Override
public void send(Id to, Request send, AgentExecutor ignored, Callback callback)
{
// Executor is ignored due to the fact callbacks are applied in a single thread already
long messageId = nextMessageId.incrementAndGet();
callbacks.put(messageId, new CallbackInfo(callback, to, nowSupplier.getAsLong() + 1000L));
send(new Packet(self, to, messageId, send));
}
@Override
public void reply(Id replyToNode, ReplyContext replyContext, Reply reply)
{
send(new Packet(self, replyToNode, MaelstromReplyContext.messageIdFor(replyContext), reply));
}
}
public static void listen(TopologyFactory topologyFactory, InputStream stdin, PrintStream out, PrintStream err) throws IOException
{
try (BufferedReader in = new BufferedReader(new InputStreamReader(stdin)))
{
listen(topologyFactory, () -> {
try
{
return in.readLine();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}, out, err);
}
}
public static void listen(TopologyFactory topologyFactory, Supplier<String> in, PrintStream out, PrintStream err) throws IOException
{
long start = System.nanoTime();
err.println("Starting...");
err.flush();
ThreadPoolScheduler scheduler = new ThreadPoolScheduler();
Node on;
Topology topology;
StdoutSink sink;
{
String line = in.get();
err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line);
err.flush();
Packet packet = Json.GSON.fromJson(line, Packet.class);
MaelstromInit init = (MaelstromInit) packet.body;
topology = topologyFactory.toTopology(init.cluster);
sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis,
MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
err.println("Initialized node " + init.self);
err.flush();
sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));
}
try
{
while (true)
{
String line = in.get();
if (line == null)
{
err.println("Received EOF; terminating");
err.flush();
scheduler.stop();
err.println("Terminated");
err.flush();
return;
}
err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line);
err.flush();
Packet next = Packet.parse(line);
switch (next.body.type)
{
case txn:
on.receive((MaelstromRequest)next.body, next.src, MaelstromReplyContext.contextFor(next.body.msg_id));
break;
default:
if (next.body.in_reply_to > Body.SENTINEL_MSG_ID)
{
Reply reply = (Reply)((Wrapper)next.body).body;
CallbackInfo callback = sink.callbacks.remove(next.body.in_reply_to);
if (callback != null)
scheduler.now(() -> {
try
{
callback.callback.onSuccess(next.src, reply);
}
catch (Throwable t)
{
callback.callback.onCallbackFailure(next.src, t);
}
});
}
else on.receive((Request)((Wrapper)next.body).body, next.src, MaelstromReplyContext.contextFor(next.body.msg_id));
}
}
}
finally
{
on.shutdown();
}
}
public static void main(String[] args) throws IOException
{
listen(new TopologyFactory(64, 3), System.in, System.out, System.err);
}
}