blob: 6fae94718683e33c5e188be73b92ba5148ea63ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.tuweni.scuttlebutt.rpc;
import org.apache.tuweni.bytes.Bytes;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Encoder responsible for encoding requests.
* <p>
* This encoder is stateful as it maintains a counter to provide different request ids over time.
*/
public final class RPCCodec {
static final AtomicInteger counter = new AtomicInteger(1);
private static ObjectMapper mapper = new ObjectMapper();
private static int nextRequestNumber() {
int requestNumber = counter.getAndIncrement();
if (requestNumber < 1) {
counter.set(1);
return 1;
}
return requestNumber;
}
/**
* Encode a message as a RPC request.
*
* @param body the body to encode as a RPC request
* @param flags the flags of the RPC request
* @return the message encoded as a RPC request
*/
public static Bytes encodeRequest(String body, RPCFlag... flags) {
return encodeRequest(Bytes.wrap(body.getBytes(StandardCharsets.UTF_8)), nextRequestNumber(), flags);
}
/**
* Encode a message as a RPC request.
*
* @param body the body to encode as a RPC request
* @param flags the flags of the RPC request
* @return the message encoded as a RPC request
*/
public static Bytes encodeRequest(Bytes body, RPCFlag... flags) {
return encodeRequest(body, nextRequestNumber(), flags);
}
/**
* Encode a message as a RPC request.
*
* @param body the body to encode as a RPC request
* @param requestNumber the number of the request. Must be equal or greater than one.
* @param flags the flags of the RPC request
* @return the message encoded as a RPC request
*/
public static Bytes encodeRequest(Bytes body, int requestNumber, RPCFlag... flags) {
if (requestNumber < 1) {
throw new IllegalArgumentException("Invalid request number");
}
byte encodedFlags = 0;
for (RPCFlag flag : flags) {
encodedFlags = flag.apply(encodedFlags);
}
return Bytes
.concatenate(
Bytes.of(encodedFlags),
Bytes.ofUnsignedInt(body.size()),
Bytes.ofUnsignedInt(requestNumber),
body);
}
/**
* Encode a message as an RPC request.
*
* @param body the body to encode as an RPC request
* @param requestNumber the request number
* @param flags the flags of the RPC request (already encoded.)
* @return the message encoded as an RPC request
*/
public static Bytes encodeRequest(Bytes body, int requestNumber, byte flags) {
return Bytes
.concatenate(Bytes.of(flags), Bytes.ofUnsignedInt(body.size()), Bytes.ofUnsignedInt(requestNumber), body);
}
/**
* Encode a message as a response to a RPC request.
*
* @param body the body to encode as the body of the response
* @param requestNumber the request of the number. Must be equal or greater than one.
* @param flagByte the flags of the RPC response encoded as a byte
* @return the response encoded as a RPC response
*/
public static Bytes encodeResponse(Bytes body, int requestNumber, byte flagByte) {
if (requestNumber < 1) {
throw new IllegalArgumentException("Invalid request number");
}
return Bytes
.concatenate(
Bytes.of(flagByte),
Bytes.ofUnsignedInt(body.size()),
Bytes.wrap(ByteBuffer.allocate(4).putInt(-requestNumber).array()),
body);
}
/**
* Encode a message as a response to a RPC request.
*
* @param body the body to encode as the body of the response
* @param requestNumber the request of the number. Must be equal or greater than one.
* @param flagByte the flags of the RPC response encoded as a byte
* @param flags the flags of the RPC request
* @return the response encoded as a RPC response
*/
public static Bytes encodeResponse(Bytes body, int requestNumber, byte flagByte, RPCFlag... flags) {
for (RPCFlag flag : flags) {
flagByte = flag.apply(flagByte);
}
return encodeResponse(body, requestNumber, flagByte);
}
/**
* Encodes a message with the body and headers set in the appropriate way to end a stream.
*
* @param requestNumber the request number
* @return the response encoded as an RPC request
* @throws JsonProcessingException if the encoding fails
*/
public static Bytes encodeStreamEndRequest(int requestNumber) throws JsonProcessingException {
Boolean bool = Boolean.TRUE;
byte[] bytes = mapper.writeValueAsBytes(bool);
return encodeRequest(
Bytes.wrap(bytes),
requestNumber,
RPCFlag.EndOrError.END,
RPCFlag.BodyType.JSON,
RPCFlag.Stream.STREAM);
}
/**
* Encode a message as a response to a RPC request.
*
* @param body the body to encode as the body of the response
* @param requestNumber the request of the number. Must be equal or greater than one.
* @param flags the flags of the RPC request
* @return the response encoded as a RPC response
*/
public static Bytes encodeResponse(Bytes body, int requestNumber, RPCFlag... flags) {
return encodeResponse(body, requestNumber, (byte) 0, flags);
}
}