blob: 1cb7a4726f559b532d1568670d978057707a6a67 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
package org.apache.tuweni.rlpx;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.bytes.MutableBytes;
import org.apache.tuweni.crypto.SECP256K1;
import org.apache.tuweni.rlp.RLP;
import org.apache.tuweni.rlpx.wire.HelloMessage;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.Consumer;
import org.bouncycastle.crypto.digests.KeccakDigest;
import org.bouncycastle.crypto.engines.AESEngine;
import org.bouncycastle.crypto.modes.SICBlockCipher;
import org.bouncycastle.crypto.params.KeyParameter;
import org.bouncycastle.crypto.params.ParametersWithIV;
import org.xerial.snappy.Snappy;
* Connection between 2 peers over the RLPx protocol.
* <p>
* The RLPx protocol creates a exchange of unique secrets during an initial handshake. The peers proceed to communicate
* using the shared secrets.
* <p>
* This connection allows encrypting and decrypting messages with a remote peer.
public final class RLPxConnection {
* Checks if two RLPx connections represent both ends of a connection.
* <p>
* Used for testing.
* @param one one RLPx connection
* @param other an other RLPx connection
* @return true if both connections refer to the same peers, on each side of the connection
public static boolean isComplementedBy(RLPxConnection one, RLPxConnection other) {
return Objects.equals(one.aesSecret, other.aesSecret)
&& Objects.equals(one.macSecret, other.macSecret)
&& Objects.equals(one.token, other.token)
&& Objects.equals(snapshot(one.egressMac), snapshot(other.ingressMac))
&& Objects.equals(snapshot(one.ingressMac), snapshot(other.egressMac));
private static Bytes32 snapshot(KeccakDigest digest) {
byte[] out = new byte[32];
new KeccakDigest(digest).doFinal(out, 0);
return Bytes32.wrap(out);
private final Bytes32 aesSecret;
private final Bytes32 macSecret;
private final Bytes32 token;
private final KeccakDigest egressMac = new KeccakDigest(Bytes32.SIZE * 8);
private final KeccakDigest ingressMac = new KeccakDigest(Bytes32.SIZE * 8);
private final SECP256K1.PublicKey publicKey;
private final SECP256K1.PublicKey peerPublicKey;
private final AESEngine macEncryptionEngine;
private boolean applySnappyCompression = false;
private Bytes buffer = Bytes.EMPTY;
Bytes32 aesSecret,
Bytes32 macSecret,
Bytes32 token,
Bytes egressMac,
Bytes ingressMac,
SECP256K1.PublicKey publicKey,
SECP256K1.PublicKey peerPublicKey) {
this.aesSecret = aesSecret;
this.macSecret = macSecret;
this.token = token;
KeyParameter macKey = new KeyParameter(macSecret.toArrayUnsafe());
macEncryptionEngine = new AESEngine();
macEncryptionEngine.init(true, macKey);
this.publicKey = publicKey;
this.peerPublicKey = peerPublicKey;
* @return our public key associated with this connection
public SECP256K1.PublicKey publicKey() {
return publicKey;
* @return the public key of the peer associated with this connection
public SECP256K1.PublicKey peerPublicKey() {
return peerPublicKey;
public void configureAfterHandshake(HelloMessage helloMessage) {
this.applySnappyCompression = helloMessage.p2pVersion() >= 5;
public synchronized void stream(Bytes newBytes, Consumer<RLPxMessage> messageConsumer) {
buffer = Bytes.concatenate(buffer, newBytes);
RLPxMessage message = null;
do {
message = readFrame(buffer);
if (message != null) {
buffer = buffer.slice(message.bytesLength());
} while (buffer.size() != 0 && message != null);
public RLPxMessage readFrame(Bytes messageFrame) {
if (messageFrame.size() < 32) {
return null;
KeyParameter aesKey = new KeyParameter(aesSecret.toArrayUnsafe());
byte[] IV = new byte[16];
Arrays.fill(IV, (byte) 0);
SICBlockCipher decryptionCipher = new SICBlockCipher(new AESEngine());
decryptionCipher.init(false, new ParametersWithIV(aesKey, IV));
Bytes macBytes = messageFrame.slice(16, 16);
Bytes headerBytes = messageFrame.slice(0, 16);
Bytes decryptedHeader = Bytes.wrap(new byte[16]);
decryptionCipher.processBytes(headerBytes.toArrayUnsafe(), 0, 16, decryptedHeader.toArrayUnsafe(), 0);
int frameSize = decryptedHeader.get(0) & 0xff;
frameSize = (frameSize << 8) + (decryptedHeader.get(1) & 0xff);
frameSize = (frameSize << 8) + (decryptedHeader.get(2) & 0xff);
int pad = frameSize % 16 == 0 ? 0 : 16 - frameSize % 16;
if (messageFrame.size() < 32 + frameSize + pad + 16) {
return null;
Bytes expectedMac = calculateMac(headerBytes, true);
if (!macBytes.equals(expectedMac)) {
throw new InvalidMACException(
"Header MAC did not match expected MAC; expected: %s, received: %s",
Bytes frameData = messageFrame.slice(32, frameSize);
Bytes frameMac = messageFrame.slice(32 + frameSize + pad, 16);
Bytes newFrameMac = Bytes.wrap(new byte[16]);
Bytes frameMacSeed = updateIngress(messageFrame.slice(32, frameSize + pad));
macEncryptionEngine.processBlock(frameMacSeed.toArrayUnsafe(), 0, newFrameMac.toArrayUnsafe(), 0);
Bytes expectedFrameMac = updateIngress(newFrameMac.xor(frameMacSeed.slice(0, 16))).slice(0, 16);
if (!expectedFrameMac.equals(frameMac)) {
throw new InvalidMACException(
"Frame MAC did not match expected MAC; expected: %s, received: %s",
Bytes decryptedFrameData = Bytes.wrap(new byte[frameData.size()]);
.processBytes(frameData.toArrayUnsafe(), 0, frameData.size(), decryptedFrameData.toArrayUnsafe(), 0);
int messageType = RLP.decodeInt(decryptedFrameData.slice(0, 1));
Bytes messageData = decryptedFrameData.slice(1);
if (applySnappyCompression) {
try {
messageData = Bytes.wrap(Snappy.uncompress(messageData.toArrayUnsafe()));
} catch (IOException e) {
throw new IllegalArgumentException(e);
return new RLPxMessage(messageType, messageData, 32 + frameSize + pad + 16);
* Frames a message for sending to an RLPx peer, encrypting it and calculating the appropriate MACs.
* @param message The message to frame.
* @return The framed message, as byte buffer.
public Bytes write(RLPxMessage message) {
KeyParameter aesKey = new KeyParameter(aesSecret.toArrayUnsafe());
byte[] IV = new byte[16];
Arrays.fill(IV, (byte) 0);
SICBlockCipher encryptionCipher = new SICBlockCipher(new AESEngine());
encryptionCipher.init(true, new ParametersWithIV(aesKey, IV));
// Compress message
Bytes messageData = message.content();
if (applySnappyCompression) {
try {
messageData = Bytes.wrap(Snappy.compress(messageData.toArrayUnsafe()));
} catch (IOException e) {
throw new IllegalArgumentException(e);
int frameSize = messageData.size() + 1;
int pad = frameSize % 16 == 0 ? 0 : 16 - frameSize % 16;
// Generate the header data.
MutableBytes frameSizeBytes = MutableBytes.create(3);
frameSizeBytes.set(0, (byte) ((frameSize >> 16) & 0xff));
frameSizeBytes.set(1, (byte) ((frameSize >> 8) & 0xff));
frameSizeBytes.set(2, (byte) (frameSize & 0xff));
Bytes protocolHeader = RLP.encodeList(writer -> {
byte[] zeros = new byte[16 - frameSizeBytes.size() - protocolHeader.size()];
Arrays.fill(zeros, (byte) 0x00);
Bytes headerBytes = Bytes.concatenate(frameSizeBytes, protocolHeader, Bytes.wrap(zeros));
Bytes encryptedHeaderBytes = Bytes.wrap(new byte[16]);
encryptionCipher.processBytes(headerBytes.toArrayUnsafe(), 0, 16, encryptedHeaderBytes.toArrayUnsafe(), 0);
Bytes headerMac = calculateMac(encryptedHeaderBytes, false);
Bytes idBytes = RLP.encodeInt(message.messageId());
assert idBytes.size() == 1;
Bytes encryptedPayload = Bytes.wrap(new byte[idBytes.size() + messageData.size() + pad]);
Bytes.concatenate(idBytes, messageData, Bytes.wrap(new byte[pad])).toArrayUnsafe(),
// Calculate the frame MAC.
Bytes payloadMacSeed = updateEgress(encryptedPayload).slice(0, 16);
Bytes payloadMac = Bytes.wrap(new byte[16]);
macEncryptionEngine.processBlock(payloadMacSeed.toArrayUnsafe(), 0, payloadMac.toArrayUnsafe(), 0);
payloadMac = updateEgress(payloadMacSeed.xor(payloadMac)).slice(0, 16);
Bytes finalBytes = Bytes.concatenate(encryptedHeaderBytes, headerMac, encryptedPayload, payloadMac);
return finalBytes;
private Bytes calculateMac(Bytes input, boolean ingress) {
Bytes mac = Bytes.wrap(new byte[16]);
snapshot(ingress ? ingressMac : egressMac).slice(0, 16).toArrayUnsafe(),
mac = mac.xor(input);
if (ingress) {
mac = updateIngress(mac).slice(0, 16);
} else {
mac = updateEgress(mac).slice(0, 16);
return mac.slice(0, 16);
private Bytes32 updateEgress(Bytes bytes) {
egressMac.update(bytes.toArrayUnsafe(), 0, bytes.size());
return snapshot(egressMac);
private Bytes32 updateIngress(Bytes bytes) {
ingressMac.update(bytes.toArrayUnsafe(), 0, bytes.size());
return snapshot(ingressMac);
public boolean equals(Object obj) {
if (this == obj) {
return true;
if (obj == null || getClass() != obj.getClass()) {
return false;
RLPxConnection that = (RLPxConnection) obj;
return Objects.equals(aesSecret, that.aesSecret)
&& Objects.equals(macSecret, that.macSecret)
&& Objects.equals(token, that.token)
&& Objects.equals(snapshot(egressMac), snapshot(that.egressMac))
&& Objects.equals(snapshot(ingressMac), snapshot(that.ingressMac));
public int hashCode() {
return Objects
.hash(Objects.hashCode(aesSecret), Objects.hashCode(macSecret), Objects.hashCode(token), egressMac, ingressMac);