blob: 7dade5d48f97d7bbf7cfb15c475dc214d0cd3238 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.tuweni.scuttlebutt.rpc;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException;
import java.io.IOException;
import java.util.Optional;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Decoded RPC message, making elements of the message available directly.
*/
public final class RPCMessage {
private final byte rpcFlags;
private final boolean stream;
private final boolean lastMessageOrError;
private final RPCFlag.BodyType bodyType;
private final Bytes body;
private final int requestNumber;
/**
* Default constructor
*
* @param messageBytes the bytes of the encoded message.
*/
public RPCMessage(Bytes messageBytes) {
rpcFlags = messageBytes.get(0);
stream = RPCFlag.Stream.STREAM.isApplied(rpcFlags);
lastMessageOrError = RPCFlag.EndOrError.END.isApplied(rpcFlags);
if (RPCFlag.BodyType.JSON.isApplied(rpcFlags)) {
bodyType = RPCFlag.BodyType.JSON;
} else if (RPCFlag.BodyType.UTF_8_STRING.isApplied(rpcFlags)) {
bodyType = RPCFlag.BodyType.UTF_8_STRING;
} else {
bodyType = RPCFlag.BodyType.BINARY;
}
int bodySize = messageBytes.slice(1, 4).toInt();
requestNumber = messageBytes.slice(5, 4).toInt();
if (messageBytes.size() < bodySize + 9) {
throw new IllegalArgumentException(
"Message body " + (messageBytes.size() - 9) + " is less than body size " + bodySize);
}
body = messageBytes.slice(9, bodySize);
}
/**
* Indicates if the message is part of a stream.
*
* @return true if the message if part of a stream
*/
public boolean stream() {
return stream;
}
/**
* Indicates if the message is either the last in the stream or an error message.
*
* @return true if this message is the last one, or an error
*/
public boolean lastMessageOrError() {
return lastMessageOrError;
}
/**
*
* @return true if this is a last message in a stream, and it is not an error
*/
public boolean isSuccessfulLastMessage() {
return lastMessageOrError() && asString().equals("true");
}
/**
*
* @return true if this is an error message response
*/
public boolean isErrorMessage() {
return lastMessageOrError && !isSuccessfulLastMessage();
}
/**
* @param objectMapper the object mapper to deserialize with
* @return the RPC error response body, if this is an error response - nothing otherwise
*/
public Optional<RPCErrorBody> getErrorBody(ObjectMapper objectMapper) {
if (!isErrorMessage()) {
// If the body of the response is 'true' or the error flag isn't set, it's a successful end condition
return Optional.empty();
} else {
try {
return Optional.of(asJSON(objectMapper, RPCErrorBody.class));
} catch (IOException e) {
return Optional.empty();
}
}
}
/**
*
* @param objectMapper the objectmatter to deserialize the error with.
*
* @return an exception if this represents an error RPC response, otherwise nothing
*/
public Optional<RPCRequestFailedException> getException(ObjectMapper objectMapper) {
if (isErrorMessage()) {
Optional<RPCRequestFailedException> exception =
getErrorBody(objectMapper).map(errorBody -> new RPCRequestFailedException(errorBody.getMessage()));
if (!exception.isPresent()) {
// If we failed to deserialize into the RPCErrorBody type there may be a bug in the server implementation
// which prevented it returning the correct type, so we just print whatever it returned
return Optional.of(new RPCRequestFailedException(this.asString()));
} else {
return exception;
}
} else {
return Optional.empty();
}
}
/**
* Provides the type of the body of the message: a binary message, a UTF-8 string or a JSON message.
*
* @return the type of the body: a binary message, a UTF-8 string or a JSON message
*/
public RPCFlag.BodyType bodyType() {
return bodyType;
}
/**
* Provides the request number of the message.
*
* @return the request number of the message
*/
public int requestNumber() {
return requestNumber;
}
/**
* Provides the body of the message.
*
* @return the bytes of the body of the message
*/
public Bytes body() {
return body;
}
/**
* Provide the RPC flags set on the message.
*
* @return the RPC flags set on the message as a single byte.
*/
public byte rpcFlags() {
return rpcFlags;
}
/**
* Provides the body of the message as a UTF-8 string.
*
* @return the body of the message as a UTF-8 string
*/
public String asString() {
return new String(body().toArrayUnsafe(), UTF_8);
}
/**
* Provides the body of the message, marshalled as a JSON object.
*
* @param objectMapper the object mapper to deserialize with
* @param clazz the JSON object class
* @param <T> the matching JSON object class
* @return a new instance of the JSON object class
* @throws IOException if an error occurs during marshalling
*/
public <T> T asJSON(ObjectMapper objectMapper, Class<T> clazz) throws IOException {
return objectMapper.readerFor(clazz).readValue(body().toArrayUnsafe());
}
}