blob: f9aad092195ff8f816e05cad49c3f4c689569a4b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.maelstrom;
import java.io.IOException;
import java.util.NavigableSet;
import java.util.TreeSet;
import accord.api.Key;
import accord.local.Node;
import accord.local.Node.Id;
import accord.maelstrom.Packet.Type;
import accord.messages.MessageType;
import accord.messages.Reply;
import accord.messages.ReplyContext;
import accord.messages.Request;
import accord.primitives.Keys;
import accord.primitives.Txn;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import com.google.gson.stream.JsonWriter;
public class MaelstromRequest extends Body implements Request
{
final Txn txn;
public MaelstromRequest(long msg_id, Txn txn)
{
super(Type.txn, msg_id, SENTINEL_MSG_ID);
this.txn = txn;
}
@Override
public void process(Node node, Id client, ReplyContext replyContext)
{
node.coordinate(txn).addCallback((success, fail) -> {
Reply reply = success != null ? new MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), (MaelstromResult) success) : null;
node.reply(client, replyContext, reply, fail);
});
}
@Override
public MessageType type()
{
throw new UnsupportedOperationException();
}
@Override
void writeBody(JsonWriter out) throws IOException
{
super.writeBody(out);
out.name("txn");
writeTxnExternal(out, txn);
}
static void writeTxnExternal(JsonWriter out, Txn txn) throws IOException
{
if (txn == null)
{
out.nullValue();
return;
}
out.beginArray();
Keys keys = (Keys)txn.keys();
MaelstromRead read = (MaelstromRead) txn.read();
MaelstromUpdate update = (MaelstromUpdate) txn.update();
for (int i = 0 ; i < keys.size() ; ++i)
{
MaelstromKey.Key key = (MaelstromKey.Key) keys.get(i);
if (read.readKeys.indexOf(key) >= 0)
{
out.beginArray();
out.value("r");
key.datum.write(out);
out.nullValue();
out.endArray();
}
if (update.containsKey(key))
{
out.beginArray();
out.value("append");
key.datum.write(out);
update.get(key).write(out);
out.endArray();
}
}
out.endArray();
}
public static Txn readTxnExternal(JsonReader in, Node.Id client, long requestId) throws IOException
{
if (in.peek() == JsonToken.NULL)
return null;
NavigableSet<Key> buildReadKeys = new TreeSet<>();
NavigableSet<Key> buildKeys = new TreeSet<>();
MaelstromUpdate update = new MaelstromUpdate();
in.beginArray();
while (in.hasNext())
{
in.beginArray();
String op = in.nextString();
Key key = MaelstromKey.readKey(in);
switch (op)
{
default: throw new IllegalStateException("Invalid op: " + op);
case "r":
in.nextNull();
buildReadKeys.add(key);
break;
case "append":
Datum value = Datum.read(in);
buildKeys.add(key);
update.merge(key, new Value(value), Value::append);
}
in.endArray();
}
in.endArray();
buildKeys.addAll(buildReadKeys);
Keys readKeys = new Keys(buildReadKeys);
Keys keys = new Keys(buildKeys);
MaelstromRead read = new MaelstromRead(readKeys, keys);
MaelstromQuery query = new MaelstromQuery(client, requestId);
return new Txn.InMemory(keys, read, query, update);
}
}